/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.execution.HopsJob;
import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.LocalResourceDTO;
import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.YarnJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.user.Users;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

public abstract class YarnJob
extends HopsJob {
    private static final Logger LOG = Logger.getLogger(YarnJob.class.getName());
    protected YarnRunner runner;
    protected YarnClient yarnClient;
    private String stdOutFinalDestination;
    private String stdErrFinalDestination;
    protected List<LocalResourceDTO> projectLocalResources;
    protected Map<String, String> jobSystemProperties;
    protected final String jobUser;
    protected Settings settings;
    protected String kafkaBrokersString;
    protected String hopsworksRestEndpoint;
    protected ServingConfig servingConfig;
    protected ServiceDiscoveryController serviceDiscoveryController;
    final EnumSet<YarnApplicationState> finalAppState = EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, YarnApplicationState.KILLED);

    public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, String hadoopDir, Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) {
        super(job, services, user, hadoopDir);
        if (!(job.getJobConfig() instanceof YarnJobConfiguration)) {
            throw new IllegalArgumentException("Job must be a YarnJobConfiguration object. Received class: " + job.getJobConfig().getClass());
        }
        LOG.log(Level.INFO, "Instantiating Yarn job as user: {0}", this.hdfsUser);
        this.jobSystemProperties = new HashMap<String, String>();
        this.projectLocalResources = new ArrayList<LocalResourceDTO>();
        this.jobUser = jobUser;
        this.settings = settings;
        this.kafkaBrokersString = kafkaBrokersString;
        this.hopsworksRestEndpoint = hopsworksRestEndpoint;
        this.servingConfig = servingConfig;
        this.serviceDiscoveryController = serviceDiscoveryController;
    }

    public final void setStdOutFinalDestination(String stdOutFinalDestination) {
        this.stdOutFinalDestination = stdOutFinalDestination;
    }

    public final void setStdErrFinalDestination(String stdErrFinalDestination) {
        this.stdErrFinalDestination = stdErrFinalDestination;
    }

    protected final String getStdOutFinalDestination() {
        return this.stdOutFinalDestination;
    }

    protected final String getStdErrFinalDestination() {
        return this.stdErrFinalDestination;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean startApplicationMaster(DistributedFileSystemOps udfso, DistributedFileSystemOps dfso, String args) {
        if (this.runner == null) {
            throw new IllegalArgumentException("The YarnRunner has not been initialized yet.");
        }
        try {
            ApplicationId appId = this.runner.startAppMaster(this.jobs.getProject(), dfso, this.user.getUsername(), args);
            this.execution = this.services.getExecutionFacade().updateFilesToRemove(this.execution, this.runner.getFilesToRemove());
            this.execution = this.services.getExecutionFacade().updateAppId(this.execution, appId.toString());
            boolean bl = true;
            return bl;
        }
        catch (AccessControlException ex) {
            LOG.log(Level.SEVERE, "Permission denied:- {0}", ex.getMessage());
            this.updateState(JobState.APP_MASTER_START_FAILED);
            boolean bl = false;
            return bl;
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "Failed to start application master for execution " + this.execution + ". Aborting execution", e);
            this.writeLog("Failed to start application master for execution " + this.execution + ". Aborting execution", e, udfso);
            try {
                this.services.getYarnExecutionFinalizer().removeAllNecessary(this.execution);
            }
            catch (IOException ex) {
                LOG.log(Level.WARNING, "Failed to remove files for failed execution {0}", this.execution);
                this.writeLog("Failed to remove files for failed execution " + this.execution, ex, udfso);
            }
            this.updateState(JobState.APP_MASTER_START_FAILED);
            boolean bl = false;
            return bl;
        }
        finally {
            if (this.runner != null) {
                this.runner.stop(this.services.getFsService());
            }
        }
    }

    @Override
    protected abstract boolean setupJob() throws JobException;

    private void writeLog(String message, Exception exception, DistributedFileSystemOps udfso) {
        Date date = new Date();
        String dateString = date.toString();
        dateString = dateString.replace(" ", "_").replace(":", "-");
        this.stdErrFinalDestination = this.stdErrFinalDestination + this.jobs.getName() + dateString + "/stderr.log";
        YarnLogUtil.writeLog(udfso, this.stdErrFinalDestination, message, exception);
        this.services.getExecutionFacade().updateStdErrPath(this.execution, this.stdErrFinalDestination);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void writeToLogs(String message, Exception e) throws IOException {
        DistributedFileSystemOps udfso = null;
        try {
            udfso = this.services.getFileOperations(this.jobUser);
            this.writeLog(message, e, udfso);
        }
        finally {
            if (null != udfso) {
                this.services.getFsService().closeDfsClient(udfso);
            }
        }
    }

    @Override
    protected void writeToLogs(String message) throws IOException {
        this.writeToLogs(message, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void runJob(String args) {
        boolean proceed;
        DistributedFileSystemOps dfso = null;
        DistributedFileSystemOps udfso = null;
        try {
            dfso = this.services.getFsService().getDfsOps();
            udfso = this.services.getFileOperations(this.hdfsUser.getUserName());
            proceed = this.startApplicationMaster(udfso, dfso, args);
        }
        finally {
            if (dfso != null) {
                dfso.close();
            }
            if (null != udfso) {
                this.services.getFsService().closeDfsClient(udfso);
            }
        }
        if (!proceed) {
            return;
        }
    }

    @Override
    public final void execute(final String args) {
        if (!this.initialized) {
            throw new IllegalStateException("Cannot execute before acquiring an Execution id.");
        }
        try {
            super.execute(args);
            this.hdfsUser.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public Void run() {
                    YarnClientWrapper yarnClientWrapper;
                    block8: {
                        yarnClientWrapper = null;
                        try {
                            boolean proceed = false;
                            try {
                                yarnClientWrapper = YarnJob.this.services.getYarnClientService().getYarnClient(YarnJob.this.hdfsUser.getUserName());
                                YarnJob.this.yarnClient = yarnClientWrapper.getYarnClient();
                                proceed = YarnJob.this.setupJob();
                            }
                            catch (Exception ex) {
                                LOG.log(Level.SEVERE, "Job Initialization Failed", ex);
                            }
                            if (proceed) break block8;
                            YarnJob.this.execution = YarnJob.this.services.getExecutionFacade().updateExecutionStop(YarnJob.this.execution, System.currentTimeMillis());
                            YarnJob.this.services.getExecutionFacade().updateState(YarnJob.this.execution, JobState.INITIALIZATION_FAILED);
                            YarnJob.this.cleanup();
                            Void void_ = null;
                            if (yarnClientWrapper != null) {
                                YarnJob.this.services.getYarnClientService().closeYarnClient(yarnClientWrapper);
                            }
                            return void_;
                        }
                        catch (Throwable throwable) {
                            if (yarnClientWrapper != null) {
                                YarnJob.this.services.getYarnClientService().closeYarnClient(yarnClientWrapper);
                            }
                            throw throwable;
                        }
                    }
                    YarnJob.this.runJob(args);
                    Void void_ = null;
                    if (yarnClientWrapper != null) {
                        YarnJob.this.services.getYarnClientService().closeYarnClient(yarnClientWrapper);
                    }
                    return void_;
                }
            });
        }
        catch (IOException | InterruptedException ex) {
            LOG.log(Level.SEVERE, null, ex);
        }
    }
}

