package io.hops.hopsworks.common.jobs.spark;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.exception.GenericException;
import io.hops.hopsworks.common.exception.JobException;
import io.hops.hopsworks.common.exception.RESTCodes;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.jobhistory.JobType;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.util.Settings;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/spark/SparkController.class */
public class SparkController {
    private static final Logger LOGGER = Logger.getLogger(SparkController.class.getName());

    @EJB
    private YarnJobsMonitor jobsMonitor;

    @EJB
    private AsynchronousJobExecutor submitter;

    @EJB
    private ActivityFacade activityFacade;

    @EJB
    private UserGroupInformationService ugiService;

    @EJB
    private HdfsUsersController hdfsUsersBean;

    @EJB
    private Settings settings;

    public Execution startJob(final Jobs jobs, final Users users) throws GenericException, JobException {
        sanityCheck(jobs, users);
        SparkJob sparkJob = null;
        try {
            try {
                sparkJob = (SparkJob) this.ugiService.getProxyUser(this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users)).doAs(new PrivilegedExceptionAction<SparkJob>() { // from class: io.hops.hopsworks.common.jobs.spark.SparkController.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedExceptionAction
                    public SparkJob run() {
                        return new SparkJob(jobs, SparkController.this.submitter, users, SparkController.this.settings.getHadoopSymbolicLinkDir(), jobs.getProject().getName() + "__" + users.getUsername(), SparkController.this.jobsMonitor, SparkController.this.settings);
                    }
                });
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
            }
            if (sparkJob == null) {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + jobs.getName() + " and id: " + jobs.getId(), "sparkjob object was null");
            }
            Execution requestExecutionId = sparkJob.requestExecutionId();
            this.submitter.startExecution(sparkJob);
            this.activityFacade.persistActivity(ActivityFacade.RAN_JOB + jobs.getName(), jobs.getProject(), users.asUser());
            return requestExecutionId;
        } catch (IOException e2) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e2.getMessage(), e2);
        }
    }

    public void stopJob(Jobs jobs, Users users, String str) {
        sanityCheck(jobs, users);
        this.submitter.stopExecution(new SparkJob(jobs, this.submitter, users, this.settings.getHadoopSymbolicLinkDir(), this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), jobs.getCreator()), this.jobsMonitor, this.settings), str);
    }

    public SparkJobConfiguration inspectProgram(String str, String str2, DistributedFileSystemOps distributedFileSystemOps) throws JobException {
        LOGGER.log(Level.INFO, "Executing Spark job by {0} at path: {1}", new Object[]{str2, str});
        SparkJobConfiguration sparkJobConfiguration = new SparkJobConfiguration();
        if (str.endsWith(".jar")) {
            try {
                JarInputStream jarInputStream = new JarInputStream(distributedFileSystemOps.open(str));
                Throwable th = null;
                try {
                    try {
                        Manifest manifest = jarInputStream.getManifest();
                        if (manifest != null) {
                            Attributes mainAttributes = manifest.getMainAttributes();
                            if (mainAttributes.containsKey(Attributes.Name.MAIN_CLASS)) {
                                sparkJobConfiguration.setMainClass(mainAttributes.getValue(Attributes.Name.MAIN_CLASS));
                            }
                        }
                        if (jarInputStream != null) {
                            if (0 != 0) {
                                try {
                                    jarInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                jarInputStream.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new JobException(RESTCodes.JobErrorCode.JAR_INSEPCTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + str, e.getMessage(), e);
            }
        } else {
            sparkJobConfiguration.setMainClass(Settings.SPARK_PY_MAINCLASS);
        }
        sparkJobConfiguration.setAppPath(str);
        sparkJobConfiguration.setHistoryServerIp(this.settings.getSparkHistoryServerIp());
        return sparkJobConfiguration;
    }

    private void sanityCheck(Jobs jobs, Users users) {
        sanityCheck(jobs, users, null);
    }

    private void sanityCheck(Jobs jobs, Users users, String str) {
        if (jobs == null) {
            throw new IllegalArgumentException("Trying to start job but job is not provided");
        }
        if (users == null) {
            throw new IllegalArgumentException("Trying to start job but user is not provided");
        }
        if (jobs.getJobType() != JobType.SPARK && jobs.getJobType() != JobType.PYSPARK) {
            throw new IllegalArgumentException("Job configuration is not a Spark job configuration. Type: " + jobs.getJobType());
        }
        if (!Strings.isNullOrEmpty(str) && !str.endsWith(".jar") && !str.endsWith(".py")) {
            throw new IllegalArgumentException("Path does not point to a jar or .py file.");
        }
    }
}
