package io.hops.hopsworks.common.jobs.spark;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jupyter.JupyterController;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.ExperimentType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.apache.hadoop.security.UserGroupInformation;

@TransactionAttribute(TransactionAttributeType.NEVER)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/spark/SparkController.class */
public class SparkController {
    private static final Logger LOGGER = Logger.getLogger(SparkController.class.getName());

    @EJB
    private JupyterController jupyterController;

    @EJB
    private AsynchronousJobExecutor submitter;

    @EJB
    private ActivityFacade activityFacade;

    @EJB
    private HdfsUsersController hdfsUsersBean;

    @EJB
    private Settings settings;

    @EJB
    private DistributedFsService dfs;

    @EJB
    private KafkaBrokers kafkaBrokers;

    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;

    @Inject
    private ServingConfig servingConfig;

    public Execution startJob(Jobs jobs, String str, Users users) throws ServiceException, GenericException, JobException, ProjectException {
        sanityCheck(jobs, users);
        String hdfsUserName = this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users);
        SparkJobConfiguration jobConfig = jobs.getJobConfig();
        String appPath = jobConfig.getAppPath();
        if (jobs.getJobType().equals(JobType.PYSPARK) && jobs.getProject().getPythonEnvironment() == null) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.SEVERE, "PySpark job needs to have Python Anaconda environment enabled");
        }
        SparkJob createSparkJob = createSparkJob(hdfsUserName, jobs, users);
        Execution requestExecutionId = createSparkJob.requestExecutionId(str);
        if (jobs.getJobType().equals(JobType.PYSPARK) && appPath.endsWith(".ipynb")) {
            this.submitter.getExecutionFacade().updateState(requestExecutionId, JobState.CONVERTING_NOTEBOOK);
            String prepJupyterNotebookConversion = HopsUtils.prepJupyterNotebookConversion(requestExecutionId, hdfsUserName, this.dfs);
            jobConfig.setAppPath(prepJupyterNotebookConversion);
            this.jupyterController.convertIPythonNotebook(jobs.getProject(), users, appPath, prepJupyterNotebookConversion, JupyterController.NotebookConversion.PY);
        }
        this.submitter.startExecution(createSparkJob, str);
        this.activityFacade.persistActivity(ActivityFacade.RAN_JOB + jobs.getName(), jobs.getProject(), users.asUser(), ActivityFlag.JOB);
        return requestExecutionId;
    }

    private void sanityCheck(Jobs jobs, Users users) throws GenericException, ProjectException {
        if (jobs == null) {
            throw new IllegalArgumentException("Trying to start job but job is not provided");
        }
        if (users == null) {
            throw new IllegalArgumentException("Trying to start job but user is not provided");
        }
        if (jobs.getJobType() != JobType.SPARK && jobs.getJobType() != JobType.PYSPARK) {
            throw new IllegalArgumentException("Job configuration is not a Spark job configuration. Type: " + jobs.getJobType());
        }
        SparkJobConfiguration jobConfig = jobs.getJobConfig();
        if (jobConfig == null) {
            throw new IllegalArgumentException("Trying to start job but JobConfiguration is null");
        }
        String appPath = jobConfig.getAppPath();
        if (Strings.isNullOrEmpty(appPath) || !(appPath.endsWith(".jar") || appPath.endsWith(".py") || appPath.endsWith(".ipynb"))) {
            throw new IllegalArgumentException("Path does not point to a .jar, .py or .ipynb file.");
        }
        inspectDependencies(jobs.getProject(), users, (SparkJobConfiguration) jobs.getJobConfig());
    }

    /* JADX WARN: Failed to calculate best type for var: r13v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0095: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:37:0x0095 */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x009a: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:39:0x009a */
    /* JADX WARN: Type inference failed for: r13v1, types: [java.util.jar.JarInputStream] */
    /* JADX WARN: Type inference failed for: r14v0, types: [java.lang.Throwable] */
    public SparkJobConfiguration inspectProgram(SparkJobConfiguration sparkJobConfiguration, String str, DistributedFileSystemOps distributedFileSystemOps) throws JobException {
        SparkJobConfiguration sparkJobConfiguration2 = sparkJobConfiguration == null ? new SparkJobConfiguration() : sparkJobConfiguration;
        if (str.endsWith(".jar")) {
            try {
                try {
                    JarInputStream jarInputStream = new JarInputStream(distributedFileSystemOps.open(str));
                    Throwable th = null;
                    Manifest manifest = jarInputStream.getManifest();
                    if (manifest != null) {
                        Attributes mainAttributes = manifest.getMainAttributes();
                        if (mainAttributes.containsKey(Attributes.Name.MAIN_CLASS)) {
                            sparkJobConfiguration2.setMainClass(mainAttributes.getValue(Attributes.Name.MAIN_CLASS));
                        } else {
                            sparkJobConfiguration2.setMainClass((String) null);
                        }
                    }
                    if (jarInputStream != null) {
                        if (0 != 0) {
                            try {
                                jarInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jarInputStream.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new JobException(RESTCodes.JobErrorCode.JAR_INSPECTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + str, e.getMessage(), e);
            }
        } else {
            if (sparkJobConfiguration == null) {
                sparkJobConfiguration2.setExperimentType(ExperimentType.EXPERIMENT);
            }
            sparkJobConfiguration2.setMainClass(Settings.SPARK_PY_MAINCLASS);
        }
        sparkJobConfiguration2.setAppPath(str);
        return sparkJobConfiguration2;
    }

    public void inspectDependencies(Project project, Users users, SparkJobConfiguration sparkJobConfiguration) throws ProjectException, GenericException {
        DistributedFileSystemOps distributedFileSystemOps = null;
        try {
            try {
                if (!Strings.isNullOrEmpty(sparkJobConfiguration.getArchives()) || !Strings.isNullOrEmpty(sparkJobConfiguration.getFiles()) || !Strings.isNullOrEmpty(sparkJobConfiguration.getJars()) || !Strings.isNullOrEmpty(sparkJobConfiguration.getPyFiles())) {
                    distributedFileSystemOps = this.dfs.getDfsOps(this.hdfsUsersBean.getHdfsUserName(project, users));
                    if (!Strings.isNullOrEmpty(sparkJobConfiguration.getArchives())) {
                        for (String str : sparkJobConfiguration.getArchives().split(",")) {
                            if (!Strings.isNullOrEmpty(str) && !distributedFileSystemOps.exists(str)) {
                                throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached archive does not exist: " + str);
                            }
                        }
                    }
                    if (!Strings.isNullOrEmpty(sparkJobConfiguration.getFiles())) {
                        for (String str2 : sparkJobConfiguration.getFiles().split(",")) {
                            if (!Strings.isNullOrEmpty(str2) && !distributedFileSystemOps.exists(str2)) {
                                throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached file does not exist: " + str2);
                            }
                        }
                    }
                    if (!Strings.isNullOrEmpty(sparkJobConfiguration.getJars())) {
                        for (String str3 : sparkJobConfiguration.getJars().split(",")) {
                            if (!Strings.isNullOrEmpty(str3) && !distributedFileSystemOps.exists(str3)) {
                                throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached JAR file does not exist: " + str3);
                            }
                        }
                    }
                    if (!Strings.isNullOrEmpty(sparkJobConfiguration.getPyFiles())) {
                        for (String str4 : sparkJobConfiguration.getPyFiles().split(",")) {
                            if (!Strings.isNullOrEmpty(str4) && !distributedFileSystemOps.exists(str4)) {
                                throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached Python file does not exist: " + str4);
                            }
                        }
                    }
                }
                if (distributedFileSystemOps != null) {
                    this.dfs.closeDfsClient(distributedFileSystemOps);
                }
            } catch (IOException e) {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.INFO, (String) null, (String) null, e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                this.dfs.closeDfsClient(null);
            }
            throw th;
        }
    }

    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    private SparkJob createSparkJob(String str, Jobs jobs, Users users) throws JobException, GenericException, ServiceException {
        SparkJob sparkJob = null;
        try {
            String str2 = "https://" + this.serviceDiscoveryController.constructServiceFQDNWithPort(ServiceDiscoveryController.HopsworksService.HOPSWORKS_APP);
            try {
                sparkJob = (SparkJob) UserGroupInformation.createProxyUser(str, UserGroupInformation.getLoginUser()).doAs(() -> {
                    return new SparkJob(jobs, this.submitter, users, this.settings.getHadoopSymbolicLinkDir(), this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users), this.settings, this.kafkaBrokers.getKafkaBrokersString(), str2, this.servingConfig, this.serviceDiscoveryController);
                });
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
            }
            if (sparkJob == null) {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + jobs.getName() + " and id: " + jobs.getId(), "sparkjob object was null");
            }
            return sparkJob;
        } catch (ServiceDiscoveryException e2) {
            throw new ServiceException(RESTCodes.ServiceErrorCode.SERVICE_NOT_FOUND, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e2.getMessage(), e2);
        } catch (IOException e3) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e3.getMessage(), e3);
        }
    }
}
