/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil;
import io.hops.hopsworks.common.jobs.yarn.YarnMonitor;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServices;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.AsyncResult;
import javax.ejb.Asynchronous;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;

@Stateless
@DependsOn(value={"Settings"})
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class YarnExecutionFinalizer {
    private static final Logger LOGGER = Logger.getLogger(YarnExecutionFinalizer.class.getName());
    @EJB
    private ExecutionFacade executionFacade;
    @EJB
    private Settings settings;
    @EJB
    private DistributedFsService dfs;
    @EJB
    private YarnClientService ycs;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Asynchronous
    public Future<Execution> copyLogs(Execution exec) {
        DistributedFileSystemOps udfso = this.dfs.getDfsOps(exec.getHdfsUser());
        ApplicationId applicationId = ApplicationId.fromString((String)exec.getAppId());
        YarnClientWrapper yarnClientWrapper = this.ycs.getYarnClientSuper(this.settings.getConfiguration());
        YarnMonitor monitor = new YarnMonitor(applicationId, yarnClientWrapper, this.ycs);
        try {
            String defaultOutputPath;
            switch (exec.getJob().getJobType()) {
                case SPARK: 
                case PYSPARK: {
                    defaultOutputPath = "Logs/Spark/";
                    break;
                }
                case FLINK: {
                    defaultOutputPath = "Logs/Flink/";
                    break;
                }
                case YARN: {
                    defaultOutputPath = "Logs/Yarn/";
                    break;
                }
                default: {
                    defaultOutputPath = "Logs/";
                }
            }
            String stdOutFinalDestination = Utils.getProjectPath(exec.getJob().getProject().getName()) + defaultOutputPath;
            String stdErrFinalDestination = Utils.getProjectPath(exec.getJob().getProject().getName()) + defaultOutputPath;
            String stdOutPath = this.settings.getAggregatedLogPath(exec.getHdfsUser(), exec.getAppId());
            try {
                stdOutFinalDestination = stdOutFinalDestination + exec.getAppId() + File.separator + "stdout.log";
                String[] desiredOutLogTypes = new String[]{"out"};
                YarnLogUtil.copyAggregatedYarnLogs(udfso, stdOutPath, stdOutFinalDestination, desiredOutLogTypes, monitor);
                stdErrFinalDestination = stdErrFinalDestination + exec.getAppId() + File.separator + "stderr.log";
                String[] desiredErrLogTypes = new String[]{"err", ".log"};
                YarnLogUtil.copyAggregatedYarnLogs(udfso, stdOutPath, stdErrFinalDestination, desiredErrLogTypes, monitor);
            }
            catch (IOException | InterruptedException | YarnException ex) {
                LOGGER.log(Level.SEVERE, "error while aggregation logs" + ex.toString());
            }
            Execution execution = this.updateExecutionSTDPaths(stdOutFinalDestination, stdErrFinalDestination, exec);
            this.finalize(exec, exec.getState());
            AsyncResult asyncResult = new AsyncResult((Object)execution);
            return asyncResult;
        }
        finally {
            this.dfs.closeDfsClient(udfso);
            monitor.close();
        }
    }

    @Asynchronous
    public void finalize(Execution exec, JobState jobState) {
        if (this.executionFacade.findById(exec.getId()) != null) {
            long executionStop = System.currentTimeMillis();
            exec = this.executionFacade.updateExecutionStop(exec, executionStop);
            this.executionFacade.updateState(exec, jobState);
        }
        try {
            this.removeAllNecessary(exec);
        }
        catch (IOException ex) {
            LOGGER.log(Level.WARNING, "Exception while cleaning after job:{0}, with appId:{1}, some cleaning is probably needed {2}", new Object[]{exec.getJob().getName(), exec.getAppId(), ex.getMessage()});
        }
        if (exec.getJob().getJobType().equals((Object)JobType.FLINK)) {
            this.cleanCerts(exec);
        }
    }

    private Execution updateExecutionSTDPaths(String stdoutPath, String stderrPath, Execution exec) {
        exec = this.executionFacade.updateStdErrPath(exec, stderrPath);
        return this.executionFacade.updateStdOutPath(exec, stdoutPath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeAllNecessary(Execution exec) throws IOException {
        List filesToRemove = exec.getFilesToRemove();
        String appDir = "hdfs://" + this.settings.getHdfsTmpCertDir() + "/" + exec.getHdfsUser() + File.separator + exec.getAppId();
        filesToRemove.add(appDir);
        DistributedFileSystemOps dfso = this.dfs.getDfsOps();
        try {
            for (String s : filesToRemove) {
                Path path = new Path(s);
                String scheme = path.toUri().getScheme();
                if (scheme != null && (scheme.equals(dfso.getFilesystem().getScheme()) || scheme.equals(dfso.getFilesystem().getAlternativeScheme())) && dfso.getFilesystem().exists(path)) {
                    dfso.getFilesystem().delete(new Path(s), true);
                    continue;
                }
                FileUtils.deleteQuietly((File)new File(s));
            }
        }
        finally {
            this.dfs.closeDfsClient(dfso);
        }
    }

    private void cleanCerts(Execution exec) {
        Collection projectServices = exec.getJob().getProject().getProjectServicesCollection();
        Iterator iter = projectServices.iterator();
        boolean removeKafkaCerts = true;
        block0: while (iter.hasNext()) {
            ProjectServices projectService = (ProjectServices)iter.next();
            if (projectService.getProjectServicesPK().getService() != ProjectServiceEnum.KAFKA) continue;
            List<Execution> execs = this.executionFacade.findByProjectAndType(exec.getJob().getProject(), JobType.FLINK);
            if (execs != null) {
                execs.addAll(this.executionFacade.findByProjectAndType(exec.getJob().getProject(), JobType.SPARK));
            }
            if (execs == null || execs.isEmpty()) continue;
            for (Execution exe : execs) {
                if (exe.getState().isFinalState()) continue;
                removeKafkaCerts = false;
                continue block0;
            }
        }
        if (removeKafkaCerts) {
            String k_certName = exec.getHdfsUser() + "__kstore.jks";
            String t_certName = exec.getHdfsUser() + "__tstore.jks";
            File k_cert = new File(this.settings.getHopsworksDomainDir() + "/domain1/config/" + k_certName);
            File t_cert = new File(this.settings.getHopsworksDomainDir() + "/domain1/config/" + t_certName);
            if (k_cert.exists()) {
                k_cert.delete();
            }
            if (t_cert.exists()) {
                t_cert.delete();
            }
        }
    }
}

