/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.execution.HopsJob;
import io.hops.hopsworks.common.jobs.yarn.ElasticProperties;
import io.hops.hopsworks.common.jobs.yarn.ServiceProperties;
import io.hops.hopsworks.common.jobs.yarn.YarnJobsMonitor;
import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.LocalResourceDTO;
import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.YarnJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.user.Users;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

public abstract class YarnJob
extends HopsJob {
    private static final Logger LOG = Logger.getLogger(YarnJob.class.getName());
    protected YarnRunner runner;
    private String stdOutFinalDestination;
    private String stdErrFinalDestination;
    protected List<LocalResourceDTO> projectLocalResources;
    protected Map<String, String> jobSystemProperties;
    protected final String jobUser;
    protected Settings settings;
    protected String kafkaBrokersString;
    final EnumSet<YarnApplicationState> finalAppState = EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, YarnApplicationState.KILLED);

    public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, String hadoopDir, YarnJobsMonitor jobsMonitor, Settings settings, String kafkaBrokersString) {
        super(job, services, user, hadoopDir, jobsMonitor);
        if (!(job.getJobConfig() instanceof YarnJobConfiguration)) {
            throw new IllegalArgumentException("Job must be a YarnJobConfiguration object. Received class: " + job.getJobConfig().getClass());
        }
        LOG.log(Level.INFO, "Instantiating Yarn job as user: {0}", this.hdfsUser);
        this.jobSystemProperties = new HashMap<String, String>();
        this.projectLocalResources = new ArrayList<LocalResourceDTO>();
        this.jobUser = jobUser;
        this.settings = settings;
        this.kafkaBrokersString = kafkaBrokersString;
    }

    public final void setStdOutFinalDestination(String stdOutFinalDestination) {
        this.stdOutFinalDestination = stdOutFinalDestination;
    }

    public final void setStdErrFinalDestination(String stdErrFinalDestination) {
        this.stdErrFinalDestination = stdErrFinalDestination;
    }

    protected final String getStdOutFinalDestination() {
        return this.stdOutFinalDestination;
    }

    protected final String getStdErrFinalDestination() {
        return this.stdErrFinalDestination;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean startApplicationMaster(DistributedFileSystemOps udfso, DistributedFileSystemOps dfso, String args) {
        if (this.runner == null) {
            throw new IllegalArgumentException("The YarnRunner has not been initialized yet.");
        }
        try {
            ApplicationId appId = this.runner.startAppMaster(this.jobs.getProject(), dfso, this.user.getUsername(), args);
            this.execution = this.services.getExecutionFacade().updateFilesToRemove(this.execution, this.runner.getFilesToRemove());
            this.execution = this.services.getExecutionFacade().updateAppId(this.execution, appId.toString());
            boolean bl = true;
            return bl;
        }
        catch (AccessControlException ex) {
            LOG.log(Level.SEVERE, "Permission denied:- {0}", ex.getMessage());
            this.updateState(JobState.APP_MASTER_START_FAILED);
            boolean bl = false;
            return bl;
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "Failed to start application master for execution " + this.execution + ". Aborting execution", e);
            this.writeLog("Failed to start application master for execution " + this.execution + ". Aborting execution", e, udfso);
            try {
                this.services.getYarnExecutionFinalizer().removeAllNecessary(this.execution);
            }
            catch (IOException ex) {
                LOG.log(Level.WARNING, "Failed to remove files for failed execution {0}", this.execution);
                this.writeLog("Failed to remove files for failed execution " + this.execution, ex, udfso);
            }
            this.updateState(JobState.APP_MASTER_START_FAILED);
            boolean bl = false;
            return bl;
        }
        finally {
            if (this.runner != null) {
                this.runner.stop(this.services.getFsService());
            }
        }
    }

    @Override
    protected boolean setupJob(DistributedFileSystemOps dfso, YarnClient yarnClient) throws JobException {
        this.serviceProps = new ServiceProperties(this.jobs.getProject().getId(), this.jobs.getProject().getName(), this.services.getSettings().getRestEndpoint(), this.jobs.getName(), new ElasticProperties(this.services.getSettings().getElasticRESTEndpoint()));
        return true;
    }

    private void writeLog(String message, Exception exception, DistributedFileSystemOps udfso) {
        Date date = new Date();
        String dateString = date.toString();
        dateString = dateString.replace(" ", "_").replace(":", "-");
        this.stdErrFinalDestination = this.stdErrFinalDestination + this.jobs.getName() + dateString + "/stderr.log";
        YarnLogUtil.writeLog(udfso, this.stdErrFinalDestination, message, exception);
        this.services.getExecutionFacade().updateStdErrPath(this.execution, this.stdErrFinalDestination);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void writeToLogs(String message, Exception e) throws IOException {
        DistributedFileSystemOps udfso = null;
        try {
            udfso = this.services.getFileOperations(this.jobUser);
            this.writeLog(message, e, udfso);
        }
        finally {
            if (null != udfso) {
                this.services.getFsService().closeDfsClient(udfso);
            }
        }
    }

    @Override
    protected void writeToLogs(String message) throws IOException {
        this.writeToLogs(message, null);
    }

    @Override
    protected void runJob(DistributedFileSystemOps udfso, DistributedFileSystemOps dfso, String args) {
        boolean proceed = this.startApplicationMaster(udfso, dfso, args);
        if (!proceed) {
            return;
        }
    }
}

