/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.dao.user.activity.ActivityFlag;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.configuration.JobType;
import io.hops.hopsworks.common.jobs.flink.FlinkJob;
import io.hops.hopsworks.common.jobs.flink.FlinkJobConfiguration;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.hadoop.security.UserGroupInformation;

@Stateless
public class FlinkController {
    private static final Logger LOGGER = Logger.getLogger(FlinkController.class.getName());
    @EJB
    YarnJobsMonitor jobsMonitor;
    @EJB
    private DistributedFsService fops;
    @EJB
    private AsynchronousJobExecutor submitter;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private UserGroupInformationService ugiService;
    @EJB
    private HdfsUsersController hdfsUsersBean;
    @EJB
    private Settings settings;

    public Execution startJob(final Jobs job, final Users user, final String sessionId) throws GenericException, JobException {
        if (job == null) {
            throw new NullPointerException("Cannot run a null job.");
        }
        if (user == null) {
            throw new NullPointerException("Cannot run a job as a null user.");
        }
        if (job.getJobType() != JobType.FLINK) {
            throw new IllegalArgumentException("Job configuration is not a Flink job configuration.");
        }
        if (!this.isFlinkJarAvailable()) {
            throw new IllegalStateException("Flink is not installed on this system.");
        }
        String username = this.hdfsUsersBean.getHdfsUserName(job.getProject(), user);
        FlinkJob flinkjob = null;
        try {
            UserGroupInformation proxyUser = this.ugiService.getProxyUser(username);
            try {
                flinkjob = (FlinkJob)proxyUser.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<FlinkJob>(){

                    @Override
                    public FlinkJob run() throws Exception {
                        return new FlinkJob(job, FlinkController.this.submitter, user, FlinkController.this.settings.getHadoopSymbolicLinkDir(), FlinkController.this.settings.getFlinkDir(), FlinkController.this.settings.getFlinkConfDir(), FlinkController.this.settings.getFlinkConfFile(), FlinkController.this.settings.getFlinkUser(), FlinkController.this.hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), FlinkController.this.settings.getHopsworksDomainDir(), FlinkController.this.jobsMonitor, FlinkController.this.settings, sessionId);
                    }
                });
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, null, ex);
            }
        }
        catch (IOException ex) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + job.getId() + ", user:" + user.getUsername(), ex.getMessage(), (Throwable)ex);
        }
        if (flinkjob == null) {
            throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + job.getName() + " and id: " + job.getId(), "sparkjob object was null");
        }
        Execution execution = flinkjob.requestExecutionId();
        this.submitter.startExecution(flinkjob);
        this.activityFacade.persistActivity(" ran a job named ", job.getProject(), user.asUser(), ActivityFlag.JOB);
        return execution;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isFlinkJarAvailable() {
        block15: {
            try (DistributedFileSystemOps dfso = null;){
                boolean isInHdfs;
                dfso = this.fops.getDfsOps();
                try {
                    isInHdfs = dfso.exists(this.settings.getHdfsFlinkJarPath());
                }
                catch (IOException e) {
                    LOGGER.log(Level.WARNING, "Cannot get Flink jar file from HDFS: {0}", this.settings.getHdfsFlinkJarPath());
                    boolean bl = false;
                    if (dfso != null) {
                        dfso.close();
                    }
                    return bl;
                }
                if (isInHdfs) {
                    boolean e = true;
                    return e;
                }
                File localFlinkJar = new File(this.settings.getLocalFlinkJarPath());
                if (localFlinkJar.exists()) {
                    try {
                        String hdfsJarPath = this.settings.getHdfsFlinkJarPath();
                        dfso.copyToHDFSFromLocal(false, this.settings.getLocalFlinkJarPath(), hdfsJarPath);
                        break block15;
                    }
                    catch (IOException e) {
                        boolean bl = false;
                        if (dfso != null) {
                            dfso.close();
                        }
                        return bl;
                    }
                }
                LOGGER.log(Level.WARNING, "Cannot find Flink jar file locally: {0}", this.settings.getLocalFlinkJarPath());
                boolean bl = false;
                return bl;
            }
        }
        return true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public FlinkJobConfiguration inspectProgram(String path, DistributedFileSystemOps udfso) throws JobException {
        try (JarInputStream jis = new JarInputStream((InputStream)udfso.open(path));){
            Manifest mf = jis.getManifest();
            Attributes atts = mf.getMainAttributes();
            FlinkJobConfiguration config = new FlinkJobConfiguration();
            if (atts.containsKey(Attributes.Name.MAIN_CLASS)) {
                config.setMainClass(atts.getValue(Attributes.Name.MAIN_CLASS));
            }
            config.setFlinkConfDir(this.settings.getFlinkConfDir());
            config.setFlinkConfFile(this.settings.getFlinkConfFile());
            config.setJarPath("hdfs://" + path);
            FlinkJobConfiguration flinkJobConfiguration = config;
            return flinkJobConfiguration;
        }
        catch (IOException ex) {
            throw new JobException(RESTCodes.JobErrorCode.JAR_INSPECTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + path, ex.getMessage(), (Throwable)ex);
        }
    }
}

