package io.hops.hopsworks.common.jobs.spark;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobs.description.JobFacade;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.dao.user.activity.ActivityFlag;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.configuration.JobType;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.jupyter.JupyterController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/spark/SparkController.class */
public class SparkController {
    private static final Logger LOGGER = Logger.getLogger(SparkController.class.getName());

    @EJB
    private YarnJobsMonitor jobsMonitor;

    @EJB
    private AsynchronousJobExecutor submitter;

    @EJB
    private ActivityFacade activityFacade;

    @EJB
    private UserGroupInformationService ugiService;

    @EJB
    private HdfsUsersController hdfsUsersBean;

    @EJB
    private Settings settings;

    @EJB
    private ExecutionController executionController;

    @EJB
    private JupyterController jupyterController;

    @EJB
    private JobFacade jobFacade;

    public Execution startJob(final Jobs jobs, final Users users) throws ServiceException, GenericException, JobException {
        sanityCheck(jobs, users);
        String hdfsUserName = this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users);
        SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration) jobs.getJobConfig();
        String appPath = sparkJobConfiguration.getAppPath();
        if (appPath.endsWith(".ipynb")) {
            String str = ("hdfs://" + Utils.getProjectPath(jobs.getProject().getName()) + "Resources") + "/job_tmp_" + jobs.getName() + ".py";
            sparkJobConfiguration.setAppPath(str);
            this.jupyterController.convertIPythonNotebook(hdfsUserName, appPath, jobs.getProject(), str);
        }
        SparkJob sparkJob = null;
        try {
            try {
                sparkJob = (SparkJob) this.ugiService.getProxyUser(hdfsUserName).doAs(new PrivilegedExceptionAction<SparkJob>() { // from class: io.hops.hopsworks.common.jobs.spark.SparkController.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedExceptionAction
                    public SparkJob run() {
                        return new SparkJob(jobs, SparkController.this.submitter, users, SparkController.this.settings.getHadoopSymbolicLinkDir(), jobs.getProject().getName() + "__" + users.getUsername(), SparkController.this.jobsMonitor, SparkController.this.settings);
                    }
                });
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
            }
            if (sparkJob == null) {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + jobs.getName() + " and id: " + jobs.getId(), "sparkjob object was null");
            }
            Execution requestExecutionId = sparkJob.requestExecutionId();
            this.submitter.startExecution(sparkJob);
            this.activityFacade.persistActivity(ActivityFacade.RAN_JOB + jobs.getName(), jobs.getProject(), users.asUser(), ActivityFlag.JOB);
            return requestExecutionId;
        } catch (IOException e2) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e2.getMessage(), e2);
        }
    }

    private void sanityCheck(Jobs jobs, Users users) {
        if (jobs == null) {
            throw new IllegalArgumentException("Trying to start job but job is not provided");
        }
        if (users == null) {
            throw new IllegalArgumentException("Trying to start job but user is not provided");
        }
        if (jobs.getJobType() != JobType.SPARK && jobs.getJobType() != JobType.PYSPARK) {
            throw new IllegalArgumentException("Job configuration is not a Spark job configuration. Type: " + jobs.getJobType());
        }
        SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration) jobs.getJobConfig();
        if (sparkJobConfiguration == null) {
            throw new IllegalArgumentException("Trying to start job but JobConfiguration is null");
        }
        String appPath = sparkJobConfiguration.getAppPath();
        if (Strings.isNullOrEmpty(appPath) || !(appPath.endsWith(".jar") || appPath.endsWith(".py") || appPath.endsWith(".ipynb"))) {
            throw new IllegalArgumentException("Path does not point to a .jar, .py or .ipynb file.");
        }
    }

    public SparkJobConfiguration inspectProgram(String str, DistributedFileSystemOps distributedFileSystemOps) throws JobException {
        SparkJobConfiguration sparkJobConfiguration;
        if (str.endsWith(".jar")) {
            sparkJobConfiguration = new SparkJobConfiguration();
            try {
                JarInputStream jarInputStream = new JarInputStream(distributedFileSystemOps.open(str));
                Throwable th = null;
                try {
                    try {
                        Manifest manifest = jarInputStream.getManifest();
                        if (manifest != null) {
                            Attributes mainAttributes = manifest.getMainAttributes();
                            if (mainAttributes.containsKey(Attributes.Name.MAIN_CLASS)) {
                                sparkJobConfiguration.setMainClass(mainAttributes.getValue(Attributes.Name.MAIN_CLASS));
                            }
                        }
                        if (jarInputStream != null) {
                            if (0 != 0) {
                                try {
                                    jarInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                jarInputStream.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new JobException(RESTCodes.JobErrorCode.JAR_INSPECTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + str, e.getMessage(), e);
            }
        } else {
            sparkJobConfiguration = new SparkJobConfiguration(ExperimentType.EXPERIMENT);
            sparkJobConfiguration.setMainClass("org.apache.spark.deploy.PythonRunner");
        }
        sparkJobConfiguration.setAppPath(str);
        return sparkJobConfiguration;
    }
}
