/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobs.description.Jobs;
import io.hops.hopsworks.common.dao.project.service.ProjectServiceEnum;
import io.hops.hopsworks.common.dao.project.service.ProjectServices;
import io.hops.hopsworks.common.dao.user.Users;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.configuration.JobType;
import io.hops.hopsworks.common.jobs.flink.FlinkJobConfiguration;
import io.hops.hopsworks.common.jobs.flink.FlinkYarnRunnerBuilder;
import io.hops.hopsworks.common.jobs.yarn.YarnJob;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.JobException;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.client.api.YarnClient;

public class FlinkJob
extends YarnJob {
    private static final Logger LOG = Logger.getLogger(FlinkJob.class.getName());
    private final FlinkJobConfiguration jobconfig;
    private final String flinkDir;
    private final String glassfishDomainDir;
    private final String flinkConfDir;
    private final String flinkConfFile;
    private final String flinkUser;
    private final String JOBTYPE_STREAMING = "Streaming";

    public FlinkJob(Jobs job, AsynchronousJobExecutor services, Users user, String hadoopDir, String flinkDir, String flinkConfDir, String flinkConfFile, String flinkUser, String jobUser, String glassfishDomainsDir, YarnJobsMonitor jobsMonitor, Settings settings, String sessionId) {
        super(job, services, user, jobUser, hadoopDir, jobsMonitor, settings, sessionId);
        if (!(job.getJobConfig() instanceof FlinkJobConfiguration)) {
            throw new IllegalArgumentException("Job must contain a FlinkJobConfiguration object. Received: " + job.getJobConfig().getClass());
        }
        this.jobconfig = (FlinkJobConfiguration)job.getJobConfig();
        this.jobconfig.setFlinkConfDir(flinkConfDir);
        this.jobconfig.setFlinkConfFile(flinkConfFile);
        this.flinkDir = flinkDir;
        this.glassfishDomainDir = glassfishDomainsDir;
        this.flinkConfDir = flinkConfDir;
        this.flinkConfFile = flinkConfFile;
        this.flinkUser = flinkUser;
    }

    @Override
    protected boolean setupJob(DistributedFileSystemOps dfso, YarnClient yarnClient) throws JobException {
        super.setupJob(dfso, yarnClient);
        if (this.jobconfig.getAppName() == null || this.jobconfig.getAppName().isEmpty()) {
            this.jobconfig.setAppName("Untitled Flink Job");
        }
        FlinkYarnRunnerBuilder flinkBuilder = new FlinkYarnRunnerBuilder(this.jobconfig.getJarPath(), this.jobconfig.getMainClass());
        flinkBuilder.setDetachedMode(true);
        flinkBuilder.setName(this.jobconfig.getAppName());
        flinkBuilder.setConfigurationDirectory(this.jobconfig.getFlinkConfDir());
        flinkBuilder.setConfigurationFilePath(new Path(this.jobconfig.getFlinkConfFile()));
        flinkBuilder.setFlinkLoggingConfigurationPath(new Path(this.jobconfig.getFlinkConfDir()));
        flinkBuilder.setTaskManagerMemory(this.jobconfig.getTaskManagerMemory());
        flinkBuilder.setTaskManagerSlots(this.jobconfig.getSlots());
        flinkBuilder.setTaskManagerCount(this.jobconfig.getNumberOfTaskManagers());
        if (this.jobconfig.getFlinkjobtype().equals("Streaming")) {
            flinkBuilder.setStreamingMode(true);
        }
        flinkBuilder.setParallelism(this.jobconfig.getParallelism());
        flinkBuilder.setJobManagerMemory(this.jobconfig.getAmMemory());
        flinkBuilder.setJobManagerQueue(this.jobconfig.getAmQueue());
        flinkBuilder.setAppJarPath(this.jobconfig.getJarPath());
        flinkBuilder.setServiceProps(this.serviceProps);
        if (this.jobconfig.getLocalResources() != null) {
            flinkBuilder.addExtraFiles(Arrays.asList(this.jobconfig.getLocalResources()));
        }
        flinkBuilder.addExtraFiles(this.projectLocalResources);
        if (this.jobconfig.getArgs() != null && !this.jobconfig.getArgs().isEmpty()) {
            String[] jobArgs = this.jobconfig.getArgs().trim().split(" ");
            flinkBuilder.addAllJobArgs(jobArgs);
        }
        if (this.jobSystemProperties != null && !this.jobSystemProperties.isEmpty()) {
            for (Map.Entry entry : this.jobSystemProperties.entrySet()) {
                flinkBuilder.addSystemProperty((String)entry.getKey(), (String)entry.getValue());
            }
        }
        try {
            this.runner = flinkBuilder.getYarnRunner(this.jobs.getProject().getName(), this.flinkUser, this.jobUser, this.hadoopDir, this.flinkDir, this.flinkConfDir, this.flinkConfFile, this.services.getFileOperations(this.hdfsUser.getUserName()), yarnClient, this.glassfishDomainDir + "/domain1/config/", this.services);
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Failed to create YarnRunner.", e);
            try {
                this.writeToLogs("Failed to start Yarn client.");
            }
            catch (IOException iOException) {
                LOG.log(Level.SEVERE, "Failed to write logs for failed application.", e);
            }
            return false;
        }
        String stdOutFinalDestination = Utils.getProjectPath(this.jobs.getProject().getName()) + "Logs/Flink/";
        String string = Utils.getProjectPath(this.jobs.getProject().getName()) + "Logs/Flink/";
        this.setStdOutFinalDestination(stdOutFinalDestination);
        this.setStdErrFinalDestination(string);
        return true;
    }

    @Override
    protected void cleanup() {
        LOG.log(Level.INFO, "Job finished performing cleanup...");
        Collection<ProjectServices> projectServices = this.jobs.getProject().getProjectServicesCollection();
        Iterator<ProjectServices> iter = projectServices.iterator();
        boolean removeKafkaCerts = true;
        block0: while (iter.hasNext()) {
            ProjectServices projectService = iter.next();
            if (projectService.getProjectServicesPK().getService() != ProjectServiceEnum.KAFKA) continue;
            List<Execution> execs = this.services.getExecutionFacade().findByProjectAndType(this.jobs.getProject(), JobType.FLINK);
            if (execs != null) {
                execs.addAll(this.services.getExecutionFacade().findByProjectAndType(this.jobs.getProject(), JobType.SPARK));
            }
            if (execs == null || execs.isEmpty()) continue;
            for (Execution exec : execs) {
                if (exec.getState().isFinalState()) continue;
                removeKafkaCerts = false;
                continue block0;
            }
        }
        if (removeKafkaCerts) {
            String k_certName = this.jobs.getProject().getName() + "__" + this.jobs.getProject().getOwner().getUsername() + "__kstore.jks";
            String t_certName = this.jobs.getProject().getName() + "__" + this.jobs.getProject().getOwner().getUsername() + "__tstore.jks";
            File k_cert = new File(this.glassfishDomainDir + "/domain1/config/" + k_certName);
            File t_cert = new File(this.glassfishDomainDir + "/domain1/config/" + t_certName);
            if (k_cert.exists()) {
                k_cert.delete();
            }
            if (t_cert.exists()) {
                t_cert.delete();
            }
        }
    }
}

