package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.dao.jobs.JobsHistoryFacade;
import io.hops.hopsworks.common.dao.project.service.ProjectServiceEnum;
import io.hops.hopsworks.common.dao.project.service.ProjectServices;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.jobs.jobhistory.JobState;
import io.hops.hopsworks.common.jobs.jobhistory.JobType;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.AsyncResult;
import javax.ejb.Asynchronous;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;

@DependsOn({"Settings"})
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/jobs/yarn/YarnExecutionFinalizer.class */
public class YarnExecutionFinalizer {
    private static final Logger LOG = Logger.getLogger(YarnExecutionFinalizer.class.getName());

    @EJB
    private ExecutionFacade executionFacade;

    @EJB
    private JobsHistoryFacade jobsHistoryFacade;

    @EJB
    private Settings settings;

    @EJB
    private DistributedFsService dfs;

    @EJB
    private YarnClientService ycs;

    private Execution updateState(JobState jobState, Execution execution) {
        return this.executionFacade.updateState(execution, jobState);
    }

    @Asynchronous
    public Future<Execution> copyLogs(Execution execution) {
        String str;
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps(execution.getHdfsUser());
        YarnMonitor yarnMonitor = new YarnMonitor(ApplicationId.fromString(execution.getAppId()), this.ycs.getYarnClientSuper(this.settings.getConfiguration()), this.ycs);
        try {
            switch (execution.getJob().getJobType()) {
                case SPARK:
                case PYSPARK:
                    str = Settings.SPARK_DEFAULT_OUTPUT_PATH;
                    break;
                case FLINK:
                    str = Settings.FLINK_DEFAULT_OUTPUT_PATH;
                    break;
                case YARN:
                default:
                    str = "Logs/";
                    break;
            }
            String str2 = Utils.getHdfsRootPath(execution.getJob().getProject().getName()) + str;
            String str3 = Utils.getHdfsRootPath(execution.getJob().getProject().getName()) + str;
            String aggregatedLogPath = this.settings.getAggregatedLogPath(execution.getHdfsUser(), execution.getAppId());
            if (str2 != null) {
                try {
                    if (!str2.isEmpty()) {
                        str2 = str2 + execution.getAppId() + File.separator + "stdout.log";
                        YarnLogUtil.copyAggregatedYarnLogs(dfsOps, aggregatedLogPath, str2, new String[]{"out"}, yarnMonitor);
                    }
                } catch (IOException | InterruptedException | YarnException e) {
                    LOG.severe("error while aggregation logs" + e.toString());
                }
            }
            if (str3 != null && !str3.isEmpty()) {
                str3 = str3 + execution.getAppId() + File.separator + "stderr.log";
                YarnLogUtil.copyAggregatedYarnLogs(dfsOps, aggregatedLogPath, str3, new String[]{"err", ".log"}, yarnMonitor);
            }
            AsyncResult asyncResult = new AsyncResult(updateExecutionSTDPaths(str2, str3, execution));
            this.dfs.closeDfsClient(dfsOps);
            yarnMonitor.close();
            return asyncResult;
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            yarnMonitor.close();
            throw th;
        }
    }

    private void removeMarkerFile(Execution execution, DistributedFileSystemOps distributedFileSystemOps) {
        String jobMarkerFile = this.settings.getJobMarkerFile(execution.getJob(), execution.getAppId());
        try {
            if (distributedFileSystemOps.exists(jobMarkerFile)) {
                distributedFileSystemOps.rm(new Path(jobMarkerFile), false);
            }
        } catch (IOException e) {
            LOG.log(Level.WARNING, "Could not remove marker file for job:{0}, with appId:{1}, {2}", new Object[]{execution.getJob().getName(), execution.getAppId(), e.getMessage()});
        }
    }

    @Asynchronous
    public void finalize(Execution execution, JobState jobState) {
        Execution updateExecutionStop = this.executionFacade.updateExecutionStop(execution, System.currentTimeMillis());
        updateJobHistoryApp(updateExecutionStop.getExecutionDuration(), updateExecutionStop);
        try {
            removeAllNecessary(updateExecutionStop);
        } catch (IOException e) {
            LOG.log(Level.WARNING, "Exception while cleaning after job:{0}, with appId:{1}, some cleanning is probably needed {2}", new Object[]{updateExecutionStop.getJob().getName(), updateExecutionStop.getAppId(), e.getMessage()});
        }
        if (updateExecutionStop.getJob().getJobType().equals(JobType.FLINK)) {
            cleanCerts(updateExecutionStop);
        }
        updateState(jobState, updateExecutionStop);
    }

    private void updateJobHistoryApp(long j, Execution execution) {
        this.jobsHistoryFacade.updateJobHistory(execution, j);
    }

    private Execution updateExecutionSTDPaths(String str, String str2, Execution execution) {
        return this.executionFacade.updateStdOutPath(this.executionFacade.updateStdErrPath(execution, str2), str);
    }

    public void removeAllNecessary(Execution execution) throws IOException {
        List<String> filesToRemove = execution.getFilesToRemove();
        filesToRemove.add("hdfs://" + this.settings.getHdfsTmpCertDir() + "/" + execution.getHdfsUser() + File.separator + execution.getAppId());
        filesToRemove.add("hdfs://" + File.separator + Settings.DIR_ROOT + File.separator + execution.getJob().getProject().getName() + File.separator + Settings.PROJECT_STAGING_DIR + File.separator + ".tensorboard." + execution.getAppId());
        filesToRemove.add(Paths.get(this.settings.getFlinkKafkaCertDir(), execution.getAppId()).toString());
        DistributedFileSystemOps dfsOps = this.dfs.getDfsOps();
        try {
            for (String str : filesToRemove) {
                if (str.startsWith("hdfs:") && dfsOps.getFilesystem().exists(new Path(str))) {
                    dfsOps.getFilesystem().delete(new Path(str), true);
                } else {
                    FileUtils.deleteQuietly(new File(str));
                }
            }
            removeMarkerFile(execution, dfsOps);
            this.dfs.closeDfsClient(dfsOps);
        } catch (Throwable th) {
            this.dfs.closeDfsClient(dfsOps);
            throw th;
        }
    }

    private void cleanCerts(Execution execution) {
        Iterator<ProjectServices> it = execution.getJob().getProject().getProjectServicesCollection().iterator();
        boolean z = true;
        while (it.hasNext()) {
            if (it.next().getProjectServicesPK().getService() == ProjectServiceEnum.KAFKA) {
                List<Execution> findForProjectByType = this.executionFacade.findForProjectByType(execution.getJob().getProject(), JobType.FLINK);
                if (findForProjectByType != null) {
                    findForProjectByType.addAll(this.executionFacade.findForProjectByType(execution.getJob().getProject(), JobType.SPARK));
                }
                if (findForProjectByType != null && !findForProjectByType.isEmpty()) {
                    Iterator<Execution> it2 = findForProjectByType.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        } else if (!it2.next().getState().isFinalState()) {
                            z = false;
                            break;
                        }
                    }
                }
            }
        }
        if (z) {
            String str = execution.getHdfsUser() + Settings.KEYSTORE_SUFFIX;
            String str2 = execution.getHdfsUser() + Settings.TRUSTSTORE_SUFFIX;
            File file = new File(this.settings.getHopsworksDomainDir() + "/domain1/config/" + str);
            File file2 = new File(this.settings.getHopsworksDomainDir() + "/domain1/config/" + str2);
            if (file.exists()) {
                file.delete();
            }
            if (file2.exists()) {
                file2.delete();
            }
        }
    }
}
