package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.exception.GenericException;
import io.hops.hopsworks.common.exception.JobException;
import io.hops.hopsworks.common.exception.RESTCodes;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.UserGroupInformationService;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.jobhistory.JobType;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.util.Settings;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;

@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/flink/FlinkController.class */
public class FlinkController {
    private static final Logger LOGGER = Logger.getLogger(FlinkController.class.getName());

    @EJB
    YarnJobsMonitor jobsMonitor;

    @EJB
    private DistributedFsService fops;

    @EJB
    private AsynchronousJobExecutor submitter;

    @EJB
    private ActivityFacade activityFacade;

    @EJB
    private UserGroupInformationService ugiService;

    @EJB
    private HdfsUsersController hdfsUsersBean;

    @EJB
    private Settings settings;

    public Execution startJob(final Jobs jobs, final Users users, final String str) throws GenericException, JobException {
        if (jobs == null) {
            throw new NullPointerException("Cannot run a null job.");
        }
        if (users == null) {
            throw new NullPointerException("Cannot run a job as a null user.");
        }
        if (jobs.getJobType() != JobType.FLINK) {
            throw new IllegalArgumentException("Job configuration is not a Flink job configuration.");
        }
        if (!isFlinkJarAvailable()) {
            throw new IllegalStateException("Flink is not installed on this system.");
        }
        FlinkJob flinkJob = null;
        try {
            try {
                flinkJob = (FlinkJob) this.ugiService.getProxyUser(this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), users)).doAs(new PrivilegedExceptionAction<FlinkJob>() { // from class: io.hops.hopsworks.common.jobs.flink.FlinkController.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedExceptionAction
                    public FlinkJob run() throws Exception {
                        return new FlinkJob(jobs, FlinkController.this.submitter, users, FlinkController.this.settings.getHadoopSymbolicLinkDir(), FlinkController.this.settings.getFlinkDir(), FlinkController.this.settings.getFlinkConfDir(), FlinkController.this.settings.getFlinkConfFile(), FlinkController.this.settings.getFlinkUser(), FlinkController.this.hdfsUsersBean.getHdfsUserName(jobs.getProject(), jobs.getCreator()), FlinkController.this.settings.getHopsworksDomainDir(), FlinkController.this.jobsMonitor, FlinkController.this.settings, str);
                    }
                });
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
            }
            if (flinkJob == null) {
                throw new GenericException(RESTCodes.GenericErrorCode.UNKNOWN_ERROR, Level.WARNING, "Could not instantiate job with name: " + jobs.getName() + " and id: " + jobs.getId(), "sparkjob object was null");
            }
            Execution requestExecutionId = flinkJob.requestExecutionId();
            this.submitter.startExecution(flinkJob);
            this.activityFacade.persistActivity(ActivityFacade.RAN_JOB, jobs.getProject(), users.asUser());
            return requestExecutionId;
        } catch (IOException e2) {
            throw new JobException(RESTCodes.JobErrorCode.PROXY_ERROR, Level.SEVERE, "job: " + jobs.getId() + ", user:" + users.getUsername(), e2.getMessage(), e2);
        }
    }

    public void stopJob(Jobs jobs, Users users, String str, String str2) throws IllegalStateException, IOException, NullPointerException, IllegalArgumentException {
        if (jobs == null) {
            throw new NullPointerException("Cannot stop a null job.");
        }
        if (users == null) {
            throw new NullPointerException("Cannot stop a job as a null user.");
        }
        if (jobs.getJobType() != JobType.FLINK) {
            throw new IllegalArgumentException("Job configuration is not a Flink job configuration.");
        }
        if (!isFlinkJarAvailable()) {
            throw new IllegalStateException("Flink is not installed on this system.");
        }
        this.submitter.stopExecution(new FlinkJob(jobs, this.submitter, users, this.settings.getHadoopSymbolicLinkDir(), this.settings.getFlinkDir(), this.settings.getFlinkConfDir(), this.settings.getFlinkConfFile(), this.settings.getFlinkUser(), jobs.getProject().getName() + "__" + users.getUsername(), this.settings.getHopsworksDomainDir(), this.jobsMonitor, this.settings, str2), str);
    }

    public boolean isFlinkJarAvailable() {
        DistributedFileSystemOps distributedFileSystemOps = null;
        try {
            distributedFileSystemOps = this.fops.getDfsOps();
            try {
                if (distributedFileSystemOps.exists(this.settings.getHdfsFlinkJarPath())) {
                    if (distributedFileSystemOps != null) {
                        distributedFileSystemOps.close();
                    }
                    return true;
                }
                if (!new File(this.settings.getLocalFlinkJarPath()).exists()) {
                    LOGGER.log(Level.WARNING, "Cannot find Flink jar file locally: {0}", this.settings.getLocalFlinkJarPath());
                    if (distributedFileSystemOps != null) {
                        distributedFileSystemOps.close();
                    }
                    return false;
                }
                try {
                    distributedFileSystemOps.copyToHDFSFromLocal(false, this.settings.getLocalFlinkJarPath(), this.settings.getHdfsFlinkJarPath());
                    if (distributedFileSystemOps == null) {
                        return true;
                    }
                    distributedFileSystemOps.close();
                    return true;
                } catch (IOException e) {
                    if (distributedFileSystemOps != null) {
                        distributedFileSystemOps.close();
                    }
                    return false;
                }
            } catch (IOException e2) {
                LOGGER.log(Level.WARNING, "Cannot get Flink jar file from HDFS: {0}", this.settings.getHdfsFlinkJarPath());
                if (distributedFileSystemOps != null) {
                    distributedFileSystemOps.close();
                }
                return false;
            }
        } catch (Throwable th) {
            if (distributedFileSystemOps != null) {
                distributedFileSystemOps.close();
            }
            throw th;
        }
    }

    public FlinkJobConfiguration inspectJar(String str, String str2, DistributedFileSystemOps distributedFileSystemOps) throws JobException {
        LOGGER.log(Level.INFO, "Executing Flink job by {0} at path: {1}", new Object[]{str2, str});
        if (!str.endsWith(".jar")) {
            throw new IllegalArgumentException("Path does not point to a jar file.");
        }
        LOGGER.log(Level.INFO, "Really executing Flink job by {0} at path: {1}", new Object[]{str2, str});
        try {
            JarInputStream jarInputStream = new JarInputStream(distributedFileSystemOps.open(str));
            Throwable th = null;
            try {
                Attributes mainAttributes = jarInputStream.getManifest().getMainAttributes();
                FlinkJobConfiguration flinkJobConfiguration = new FlinkJobConfiguration();
                if (mainAttributes.containsKey(Attributes.Name.MAIN_CLASS)) {
                    flinkJobConfiguration.setMainClass(mainAttributes.getValue(Attributes.Name.MAIN_CLASS));
                }
                flinkJobConfiguration.setFlinkConfDir(this.settings.getFlinkConfDir());
                flinkJobConfiguration.setFlinkConfFile(this.settings.getFlinkConfFile());
                flinkJobConfiguration.setJarPath(str);
                if (jarInputStream != null) {
                    if (0 != 0) {
                        try {
                            jarInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        jarInputStream.close();
                    }
                }
                return flinkJobConfiguration;
            } finally {
            }
        } catch (IOException e) {
            throw new JobException(RESTCodes.JobErrorCode.JAR_INSPECTION_ERROR, Level.SEVERE, "Failed to inspect jar at:" + str, e.getMessage(), e);
        }
    }
}
