/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.spark;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.spark.SparkJob;
import io.hops.hopsworks.common.jupyter.JupyterController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.ExperimentType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.hadoop.security.UserGroupInformation;

@Stateless
public class SparkController {
    private static final Logger LOGGER = Logger.getLogger(SparkController.class.getName());
    @EJB
    private AsynchronousJobExecutor submitter;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private UserGroupInformationService ugiService;
    @EJB
    private HdfsUsersController hdfsUsersBean;
    @EJB
    private Settings settings;
    @EJB
    private JupyterController jupyterController;
    @EJB
    private DistributedFsService dfs;

    public Execution startJob(final Jobs job, String args, final Users user) throws ServiceException, GenericException, JobException, ProjectException {
        this.sanityCheck(job, user);
        String username = this.hdfsUsersBean.getHdfsUserName(job.getProject(), user);
        SparkJobConfiguration sparkConfig = (SparkJobConfiguration)job.getJobConfig();
        String appPath = sparkConfig.getAppPath();
        if (job.getJobType().equals((Object)JobType.PYSPARK)) {
            if (!job.getProject().getConda().booleanValue()) {
                throw new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.SEVERE, "PySpark job needs to have Python Anaconda environment enabled");
            }
            if (appPath.endsWith(".ipynb")) {
                String outPath = "hdfs://" + Utils.getProjectPath(job.getProject().getName()) + "Resources";
                String pyAppPath = outPath + "/job_tmp_" + job.getName() + ".py";
                sparkConfig.setAppPath(pyAppPath);
                this.jupyterController.convertIPythonNotebook(username, appPath, job.getProject(), pyAppPath, JupyterController.NotebookConversion.PY);
            }
        }
        SparkJob sparkjob = null;
        try {
            UserGroupInformation proxyUser = this.ugiService.getProxyUser(username);
            try {
                sparkjob = (SparkJob)proxyUser.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<SparkJob>(){

                    @Override
                    public SparkJob run() {
                        return new SparkJob(job, SparkController.this.submitter, user, SparkController.this.settings.getHadoopSymbolicLinkDir(), job.getProject().getName() + "__" + user.getUsername(), SparkController.this.settings);
                    }
                });
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, null, ex);
            }
        }
        catch (IOException ex) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + job.getId() + ", user:" + user.getUsername(), ex.getMessage(), (Throwable)ex);
        }
        if (sparkjob == null) {
            throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + job.getName() + " and id: " + job.getId(), "sparkjob object was null");
        }
        Execution exec = sparkjob.requestExecutionId(args);
        this.submitter.startExecution(sparkjob, args);
        this.activityFacade.persistActivity(" ran a job named " + job.getName(), job.getProject(), user.asUser(), ActivityFlag.JOB);
        return exec;
    }

    private void sanityCheck(Jobs job, Users user) throws GenericException, ProjectException {
        if (job == null) {
            throw new IllegalArgumentException("Trying to start job but job is not provided");
        }
        if (user == null) {
            throw new IllegalArgumentException("Trying to start job but user is not provided");
        }
        if (job.getJobType() != JobType.SPARK && job.getJobType() != JobType.PYSPARK) {
            throw new IllegalArgumentException("Job configuration is not a Spark job configuration. Type: " + job.getJobType());
        }
        SparkJobConfiguration jobConf = (SparkJobConfiguration)job.getJobConfig();
        if (jobConf == null) {
            throw new IllegalArgumentException("Trying to start job but JobConfiguration is null");
        }
        String path = jobConf.getAppPath();
        if (Strings.isNullOrEmpty((String)path) || !path.endsWith(".jar") && !path.endsWith(".py") && !path.endsWith(".ipynb")) {
            throw new IllegalArgumentException("Path does not point to a .jar, .py or .ipynb file.");
        }
        this.inspectDependencies(job.getProject(), user, (SparkJobConfiguration)job.getJobConfig());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public SparkJobConfiguration inspectProgram(String path, DistributedFileSystemOps udfso) throws JobException {
        SparkJobConfiguration config;
        block16: {
            if (path.endsWith(".jar")) {
                config = new SparkJobConfiguration();
                try (JarInputStream jis = new JarInputStream((InputStream)udfso.open(path));){
                    Attributes atts;
                    Manifest mf = jis.getManifest();
                    if (mf != null && (atts = mf.getMainAttributes()).containsKey(Attributes.Name.MAIN_CLASS)) {
                        config.setMainClass(atts.getValue(Attributes.Name.MAIN_CLASS));
                    }
                    break block16;
                }
                catch (IOException ex) {
                    throw new JobException(RESTCodes.JobErrorCode.JAR_INSPECTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + path, ex.getMessage(), (Throwable)ex);
                }
            }
            config = new SparkJobConfiguration(ExperimentType.EXPERIMENT);
            config.setMainClass("org.apache.spark.deploy.PythonRunner");
        }
        config.setAppPath(path);
        return config;
    }

    public void inspectDependencies(Project project, Users user, SparkJobConfiguration jobConf) throws ProjectException, GenericException {
        DistributedFileSystemOps udfso = null;
        try {
            if (!(Strings.isNullOrEmpty((String)jobConf.getArchives()) && Strings.isNullOrEmpty((String)jobConf.getFiles()) && Strings.isNullOrEmpty((String)jobConf.getJars()) && Strings.isNullOrEmpty((String)jobConf.getPyFiles()))) {
                udfso = this.dfs.getDfsOps(this.hdfsUsersBean.getHdfsUserName(project, user));
                if (!Strings.isNullOrEmpty((String)jobConf.getArchives())) {
                    for (String filePath : jobConf.getArchives().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached archive does not exist: " + filePath);
                    }
                }
                if (!Strings.isNullOrEmpty((String)jobConf.getFiles())) {
                    for (String filePath : jobConf.getFiles().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached file does not exist: " + filePath);
                    }
                }
                if (!Strings.isNullOrEmpty((String)jobConf.getJars())) {
                    for (String filePath : jobConf.getJars().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached JAR file does not exist: " + filePath);
                    }
                }
                if (!Strings.isNullOrEmpty((String)jobConf.getPyFiles())) {
                    for (String filePath : jobConf.getPyFiles().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached Python file does not exist: " + filePath);
                    }
                }
            }
            if (udfso != null) {
                this.dfs.closeDfsClient(udfso);
            }
        }
        catch (IOException ex) {
            try {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.INFO, null, null, (Throwable)ex);
            }
            catch (Throwable throwable) {
                if (udfso != null) {
                    this.dfs.closeDfsClient(udfso);
                }
                throw throwable;
            }
        }
    }
}

