/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.spark;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.spark.SparkJob;
import io.hops.hopsworks.common.jupyter.JupyterController;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.ExperimentType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.io.InputStream;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.apache.hadoop.security.UserGroupInformation;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class SparkController {
    private static final Logger LOGGER = Logger.getLogger(SparkController.class.getName());
    @EJB
    private JupyterController jupyterController;
    @EJB
    private AsynchronousJobExecutor submitter;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private UserGroupInformationService ugiService;
    @EJB
    private HdfsUsersController hdfsUsersBean;
    @EJB
    private Settings settings;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private KafkaBrokers kafkaBrokers;
    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    @Inject
    private ServingConfig servingConfig;

    public Execution startJob(Jobs job, String args, Users user) throws ServiceException, GenericException, JobException, ProjectException {
        this.sanityCheck(job, user);
        String username = this.hdfsUsersBean.getHdfsUserName(job.getProject(), user);
        SparkJobConfiguration sparkConfig = (SparkJobConfiguration)job.getJobConfig();
        String appPath = sparkConfig.getAppPath();
        if (job.getJobType().equals((Object)JobType.PYSPARK) && job.getProject().getPythonEnvironment() == null) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.SEVERE, "PySpark job needs to have Python Anaconda environment enabled");
        }
        SparkJob sparkjob = this.createSparkJob(username, job, user);
        Execution exec = sparkjob.requestExecutionId(args);
        if (job.getJobType().equals((Object)JobType.PYSPARK) && appPath.endsWith(".ipynb")) {
            this.submitter.getExecutionFacade().updateState(exec, JobState.CONVERTING_NOTEBOOK);
            String pyAppPath = HopsUtils.prepJupyterNotebookConversion(exec, username, this.dfs);
            sparkConfig.setAppPath(pyAppPath);
            this.jupyterController.convertIPythonNotebook(username, appPath, job.getProject(), pyAppPath, JupyterController.NotebookConversion.PY);
        }
        this.submitter.startExecution(sparkjob, args);
        this.activityFacade.persistActivity(" ran a job named " + job.getName(), job.getProject(), user.asUser(), ActivityFlag.JOB);
        return exec;
    }

    private void sanityCheck(Jobs job, Users user) throws GenericException, ProjectException {
        if (job == null) {
            throw new IllegalArgumentException("Trying to start job but job is not provided");
        }
        if (user == null) {
            throw new IllegalArgumentException("Trying to start job but user is not provided");
        }
        if (job.getJobType() != JobType.SPARK && job.getJobType() != JobType.PYSPARK) {
            throw new IllegalArgumentException("Job configuration is not a Spark job configuration. Type: " + job.getJobType());
        }
        SparkJobConfiguration jobConf = (SparkJobConfiguration)job.getJobConfig();
        if (jobConf == null) {
            throw new IllegalArgumentException("Trying to start job but JobConfiguration is null");
        }
        String path = jobConf.getAppPath();
        if (Strings.isNullOrEmpty((String)path) || !path.endsWith(".jar") && !path.endsWith(".py") && !path.endsWith(".ipynb")) {
            throw new IllegalArgumentException("Path does not point to a .jar, .py or .ipynb file.");
        }
        this.inspectDependencies(job.getProject(), user, (SparkJobConfiguration)job.getJobConfig());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public SparkJobConfiguration inspectProgram(SparkJobConfiguration existingConfig, String path, DistributedFileSystemOps udfso) throws JobException {
        SparkJobConfiguration sparkConfig;
        block18: {
            sparkConfig = null;
            sparkConfig = existingConfig == null ? new SparkJobConfiguration() : existingConfig;
            if (path.endsWith(".jar")) {
                try (JarInputStream jis = new JarInputStream((InputStream)udfso.open(path));){
                    Manifest mf = jis.getManifest();
                    if (mf == null) break block18;
                    Attributes atts = mf.getMainAttributes();
                    if (atts.containsKey(Attributes.Name.MAIN_CLASS)) {
                        sparkConfig.setMainClass(atts.getValue(Attributes.Name.MAIN_CLASS));
                        break block18;
                    } else {
                        sparkConfig.setMainClass(null);
                    }
                    break block18;
                }
                catch (IOException ex) {
                    throw new JobException(RESTCodes.JobErrorCode.JAR_INSPECTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + path, ex.getMessage(), (Throwable)ex);
                }
            }
            if (existingConfig == null) {
                sparkConfig.setExperimentType(ExperimentType.EXPERIMENT);
            }
            sparkConfig.setMainClass("org.apache.spark.deploy.PythonRunner");
        }
        sparkConfig.setAppPath(path);
        return sparkConfig;
    }

    public void inspectDependencies(Project project, Users user, SparkJobConfiguration jobConf) throws ProjectException, GenericException {
        DistributedFileSystemOps udfso = null;
        try {
            if (!(Strings.isNullOrEmpty((String)jobConf.getArchives()) && Strings.isNullOrEmpty((String)jobConf.getFiles()) && Strings.isNullOrEmpty((String)jobConf.getJars()) && Strings.isNullOrEmpty((String)jobConf.getPyFiles()))) {
                udfso = this.dfs.getDfsOps(this.hdfsUsersBean.getHdfsUserName(project, user));
                if (!Strings.isNullOrEmpty((String)jobConf.getArchives())) {
                    for (String filePath : jobConf.getArchives().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached archive does not exist: " + filePath);
                    }
                }
                if (!Strings.isNullOrEmpty((String)jobConf.getFiles())) {
                    for (String filePath : jobConf.getFiles().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached file does not exist: " + filePath);
                    }
                }
                if (!Strings.isNullOrEmpty((String)jobConf.getJars())) {
                    for (String filePath : jobConf.getJars().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached JAR file does not exist: " + filePath);
                    }
                }
                if (!Strings.isNullOrEmpty((String)jobConf.getPyFiles())) {
                    for (String filePath : jobConf.getPyFiles().split(",")) {
                        if (Strings.isNullOrEmpty((String)filePath) || udfso.exists(filePath)) continue;
                        throw new ProjectException(RESTCodes.ProjectErrorCode.FILE_NOT_FOUND, Level.FINEST, "Attached Python file does not exist: " + filePath);
                    }
                }
            }
            if (udfso != null) {
                this.dfs.closeDfsClient(udfso);
            }
        }
        catch (IOException ex) {
            try {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.INFO, null, null, (Throwable)ex);
            }
            catch (Throwable throwable) {
                if (udfso != null) {
                    this.dfs.closeDfsClient(udfso);
                }
                throw throwable;
            }
        }
    }

    @TransactionAttribute(value=TransactionAttributeType.REQUIRES_NEW)
    private SparkJob createSparkJob(String username, Jobs job, Users user) throws JobException, GenericException, ServiceException {
        SparkJob sparkjob = null;
        try {
            String hopsworksRestEndpoint = "https://" + this.serviceDiscoveryController.constructServiceFQDNWithPort(ServiceDiscoveryController.HopsworksService.HOPSWORKS_APP);
            UserGroupInformation proxyUser = this.ugiService.getProxyUser(username);
            try {
                sparkjob = (SparkJob)proxyUser.doAs(() -> new SparkJob(job, this.submitter, user, this.settings.getHadoopSymbolicLinkDir(), this.hdfsUsersBean.getHdfsUserName(job.getProject(), user), this.settings, this.kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, this.servingConfig, this.serviceDiscoveryController));
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, null, ex);
            }
        }
        catch (IOException ex) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + job.getId() + ", user:" + user.getUsername(), ex.getMessage(), (Throwable)ex);
        }
        catch (ServiceDiscoveryException ex) {
            throw new ServiceException(RESTCodes.ServiceErrorCode.SERVICE_NOT_FOUND, Level.SEVERE, "job: " + job.getId() + ", user:" + user.getUsername(), ex.getMessage(), (Throwable)ex);
        }
        if (sparkjob == null) {
            throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + job.getName() + " and id: " + job.getId(), "sparkjob object was null");
        }
        return sparkjob;
    }
}

