/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.hdfs.command;

import io.hops.hopsworks.common.dao.hdfs.command.HdfsCommandExecutionFacade;
import io.hops.hopsworks.common.dao.jobs.description.JobFacade;
import io.hops.hopsworks.common.hdfs.command.ArchiveFormat;
import io.hops.hopsworks.common.jobs.JobController;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.hdfs.command.Command;
import io.hops.hopsworks.persistence.entity.hdfs.command.HdfsCommandExecution;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobFinalStatus;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.Date;
import java.util.Optional;
import java.util.logging.Level;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import org.apache.hadoop.fs.Path;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class HdfsCommandExecutionController {
    @EJB
    private HdfsCommandExecutionFacade hdfsCommandExecutionFacade;
    @EJB
    private JobController jobController;
    @EJB
    private JobFacade jobFacade;
    @EJB
    private Settings settings;
    @Inject
    private ExecutionController executionController;
    private static final String JOB_NAME = "hdfs_file_operations";

    public HdfsCommandExecution setupAndStartJob(Users user, Project project, Command command, Inode srcInode, Path src, Path dest, ArchiveFormat format, boolean overwrite) throws JobException, ProjectException, ServiceException, GenericException {
        Optional<HdfsCommandExecution> hdfsCommandExecutionOptional = this.hdfsCommandExecutionFacade.findBySrc(srcInode);
        if (hdfsCommandExecutionOptional.isPresent() && JobFinalStatus.UNDEFINED.equals((Object)hdfsCommandExecutionOptional.get().getExecution().getFinalStatus())) {
            throw new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.FINE, "There is a running " + command.getJobCmd() + " job on this file.");
        }
        String jobArgs = this.getJobArgs(command.getJobCmd(), src, dest, format, overwrite);
        Jobs job = this.configureJob(user, project);
        Execution execution = this.executionController.start(job, jobArgs, user);
        if (hdfsCommandExecutionOptional.isPresent()) {
            HdfsCommandExecution hdfsCommandExecution = hdfsCommandExecutionOptional.get();
            hdfsCommandExecution.setCommand(command);
            hdfsCommandExecution.setExecution(execution);
            hdfsCommandExecution.setSubmitted(new Date());
            this.hdfsCommandExecutionFacade.update(hdfsCommandExecution);
        } else {
            HdfsCommandExecution hdfsCommandExecution = new HdfsCommandExecution(execution, command, srcInode);
            this.hdfsCommandExecutionFacade.save(hdfsCommandExecution);
        }
        return this.hdfsCommandExecutionFacade.findByExecution(execution).orElseThrow(() -> new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.FINE, "Failed to start execution"));
    }

    private Jobs configureJob(Users user, Project project) throws JobException {
        Jobs job = this.jobFacade.findByProjectAndName(project, JOB_NAME);
        if (job != null) {
            if (!this.isJobValid(job)) {
                throw new JobException(RESTCodes.JobErrorCode.JOB_START_FAILED, Level.FINE, "Another job with the same name already exist.");
            }
            return job;
        }
        SparkJobConfiguration sparkJobConfiguration = new SparkJobConfiguration();
        sparkJobConfiguration.setAppName(JOB_NAME);
        sparkJobConfiguration.setMainClass("org.apache.spark.deploy.PythonRunner");
        sparkJobConfiguration.setAppPath(this.settings.getHdfsFileOpJobUtil());
        sparkJobConfiguration.setAmMemory(this.settings.getHdfsFileOpJobDriverMemory());
        sparkJobConfiguration.setExecutorInstances(0);
        sparkJobConfiguration.setDynamicAllocationEnabled(false);
        return this.jobController.putJob(user, project, null, (JobConfiguration)sparkJobConfiguration);
    }

    private String getJobArgs(String op, Path src, Path dest, ArchiveFormat format, boolean overwrite) {
        String destArg = dest == null ? "" : " -dest " + dest;
        String formatArg = format == null ? "" : " -format " + format.getJobFormat();
        String overwriteArg = overwrite ? " -overwrite True" : "";
        return "-op " + op + " -src " + src + destArg + formatArg + overwriteArg;
    }

    private boolean isJobValid(Jobs job) {
        SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration)job.getJobConfig();
        if (JobType.PYSPARK.equals((Object)job.getJobType()) && sparkJobConfiguration.getAppPath() != null) {
            int index = sparkJobConfiguration.getAppPath().indexOf(45);
            String appPath = index > -1 ? sparkJobConfiguration.getAppPath().substring(0, index) : sparkJobConfiguration.getAppPath();
            return this.settings.getHdfsFileOpJobUtil().startsWith(appPath);
        }
        return false;
    }
}

