package io.hops.hopsworks.common.jobs.execution;

import io.hops.hopsworks.common.dao.hdfs.inode.InodeFacade;
import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.dao.jobs.JobsHistoryFacade;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.exception.GenericException;
import io.hops.hopsworks.common.exception.JobException;
import io.hops.hopsworks.common.exception.RESTCodes;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.jobs.flink.FlinkController;
import io.hops.hopsworks.common.jobs.spark.SparkController;
import io.hops.hopsworks.common.jobs.spark.SparkJobConfiguration;
import io.hops.hopsworks.common.util.Settings;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.fs.Path;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/execution/ExecutionController.class */
public class ExecutionController {

    @EJB
    private SparkController sparkController;

    @EJB
    private FlinkController flinkController;

    @EJB
    private InodeFacade inodes;

    @EJB
    private ActivityFacade activityFacade;

    @EJB
    private JobsHistoryFacade jobHistoryFac;

    @EJB
    private HdfsUsersController hdfsUsersBean;

    @EJB
    private DistributedFsService dfs;

    @EJB
    private Settings settings;

    @EJB
    private ExecutionFacade execFacade;
    private static final Logger LOGGER = Logger.getLogger(ExecutionController.class.getName());

    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public Execution start(Jobs jobs, Users users) throws GenericException, JobException {
        Execution startJob;
        switch (jobs.getJobType()) {
            case FLINK:
                return this.flinkController.startJob(jobs, users, null);
            case SPARK:
                startJob = this.sparkController.startJob(jobs, users);
                if (startJob != null) {
                    int intValue = startJob.getId().intValue();
                    String appPath = ((SparkJobConfiguration) jobs.getJobConfig()).getAppPath();
                    Pattern.compile("hdfs://(.*)\\s").matcher(appPath);
                    String name = this.inodes.getInodeAtPath(appPath.replace("hdfs://" + appPath.split("/")[2], "")).getInodePK().getName();
                    this.jobHistoryFac.persist(users, jobs, intValue, startJob.getAppId());
                    this.activityFacade.persistActivity(ActivityFacade.EXECUTED_JOB + name, jobs.getProject(), users);
                    break;
                } else {
                    throw new IllegalArgumentException("Problem getting execution object for: " + jobs.getJobType());
                }
            case PYSPARK:
                startJob = this.sparkController.startJob(jobs, users);
                if (startJob == null) {
                    throw new IllegalArgumentException("Error while getting execution object for: " + jobs.getJobType());
                }
                break;
            default:
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ACTION, Level.FINE, "Unsupported job type: " + jobs.getJobType());
        }
        return startJob;
    }

    public void kill(Jobs jobs, Users users) {
        List<Execution> findForJob = this.execFacade.findForJob(jobs);
        Collections.sort(findForJob, new Comparator<Execution>() { // from class: io.hops.hopsworks.common.jobs.execution.ExecutionController.1
            @Override // java.util.Comparator
            public int compare(Execution execution, Execution execution2) {
                if (execution.getId().intValue() > execution2.getId().intValue()) {
                    return -1;
                }
                return execution.getId().intValue() < execution2.getId().intValue() ? 1 : 0;
            }
        });
        String appId = findForJob.get(0).getAppId();
        try {
            try {
                DistributedFileSystemOps dfsOps = this.dfs.getDfsOps(this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users));
                String jobMarkerFile = this.settings.getJobMarkerFile(jobs, appId);
                if (dfsOps.exists(jobMarkerFile)) {
                    dfsOps.rm(new Path(jobMarkerFile), false);
                } else {
                    Runtime.getRuntime().exec(this.settings.getHadoopSymbolicLinkDir() + "/bin/yarn application -kill " + appId);
                }
                if (dfsOps != null) {
                    this.dfs.closeDfsClient(dfsOps);
                }
            } catch (IOException e) {
                LOGGER.log(Level.SEVERE, "Could not remove marker file for job:" + jobs.getName() + "with appId:" + appId, (Throwable) e);
                if (0 != 0) {
                    this.dfs.closeDfsClient(null);
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                this.dfs.closeDfsClient(null);
            }
            throw th;
        }
    }

    public void stop(Jobs jobs, Users users, String str) throws IOException {
        switch (jobs.getJobType()) {
            case FLINK:
                this.flinkController.stopJob(jobs, users, str, null);
                return;
            case SPARK:
                this.sparkController.stopJob(jobs, users, str);
                return;
            default:
                throw new IllegalArgumentException("Unsupported job type: " + jobs.getJobType());
        }
    }
}
