package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.dao.jobhistory.Execution;
import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.jobs.jobhistory.JobFinalStatus;
import io.hops.hopsworks.common.jobs.jobhistory.JobState;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Timer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;

@Singleton
@DependsOn({"Settings"})
/* loaded from: input_file:io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor.class */
public class YarnJobsMonitor {
    private static final Logger LOG = Logger.getLogger(YarnJobsMonitor.class.getName());

    @EJB
    private Settings settings;

    @EJB
    private ExecutionFacade executionFacade;

    @EJB
    private YarnExecutionFinalizer execFinalizer;

    @EJB
    private YarnClientService ycs;
    private int maxStatusPollRetry;
    Map<String, Execution> executions = new HashMap();
    Map<String, YarnMonitor> monitors = new HashMap();
    Map<String, Integer> failures = new HashMap();
    boolean init = true;
    private List<CopyLogsFutureResult> copyLogsFutures = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hops/hopsworks/common/jobs/yarn/YarnJobsMonitor$CopyLogsFutureResult.class */
    public class CopyLogsFutureResult {
        private final Future<Execution> execFuture;
        private final JobState jobState;

        private CopyLogsFutureResult(Future<Execution> future, JobState jobState) {
            this.execFuture = future;
            this.jobState = jobState;
        }
    }

    public void addToMonitor(String str, Execution execution, YarnMonitor yarnMonitor) {
        YarnMonitor start = yarnMonitor.start();
        this.executions.put(str, execution);
        this.monitors.put(str, start);
    }

    @Schedule(persistent = false, second = Settings.KAFKA_ACL_WILDCARD, minute = Settings.KAFKA_ACL_WILDCARD, hour = Settings.KAFKA_ACL_WILDCARD)
    public synchronized void monitor(Timer timer) {
        if (this.init) {
            List<Execution> findAllNotFinished = this.executionFacade.findAllNotFinished();
            if (findAllNotFinished != null) {
                for (Execution execution : findAllNotFinished) {
                    if (execution.getAppId() != null) {
                        this.executions.put(execution.getAppId(), execution);
                    }
                }
            }
            this.maxStatusPollRetry = this.settings.getMaxStatusPollRetry();
            this.init = false;
        }
        ArrayList<String> arrayList = new ArrayList();
        ArrayList<Execution> arrayList2 = new ArrayList();
        for (String str : this.executions.keySet()) {
            YarnMonitor yarnMonitor = this.monitors.get(str);
            if (yarnMonitor == null) {
                yarnMonitor = new YarnMonitor(ApplicationId.fromString(str), this.ycs.getYarnClientSuper(this.settings.getConfiguration()), this.ycs);
                this.monitors.put(str, yarnMonitor);
            }
            Execution internalMonitor = internalMonitor(this.executions.get(str), yarnMonitor);
            if (internalMonitor != null) {
                arrayList2.add(internalMonitor);
            } else {
                arrayList.add(str);
                yarnMonitor.close();
            }
        }
        for (Execution execution2 : arrayList2) {
            this.executions.put(execution2.getAppId(), execution2);
        }
        for (String str2 : arrayList) {
            this.executions.remove(str2);
            this.failures.remove(str2);
            this.monitors.remove(str2);
        }
        Iterator<CopyLogsFutureResult> it = this.copyLogsFutures.iterator();
        while (it.hasNext()) {
            CopyLogsFutureResult next = it.next();
            if (next.execFuture.isDone()) {
                try {
                    this.execFinalizer.finalize((Execution) next.execFuture.get(), next.jobState);
                } catch (InterruptedException | ExecutionException e) {
                    LOG.log(Level.SEVERE, e.getMessage(), (Throwable) e);
                }
                it.remove();
            }
        }
    }

    private Execution internalMonitor(Execution execution, YarnMonitor yarnMonitor) {
        try {
            YarnApplicationState applicationState = yarnMonitor.getApplicationState();
            execution = updateFinalStatus(JobFinalStatus.getJobFinalStatus(yarnMonitor.getFinalApplicationStatus()), updateState(JobState.getJobState(applicationState), updateProgress(yarnMonitor.getProgress(), execution)));
            if (applicationState == YarnApplicationState.FAILED || applicationState == YarnApplicationState.FINISHED || applicationState == YarnApplicationState.KILLED) {
                execution = this.executionFacade.updateState(execution, JobState.AGGREGATING_LOGS);
                this.copyLogsFutures.add(new CopyLogsFutureResult(this.execFinalizer.copyLogs(execution), JobState.getJobState(applicationState)));
                return null;
            }
        } catch (IOException | YarnException e) {
            Integer num = this.failures.get(execution.getAppId());
            this.failures.put(execution.getAppId(), num == null ? 1 : Integer.valueOf(num.intValue() + 1));
            LOG.log(Level.WARNING, "Failed to get application state for execution " + execution + ". Tried " + this.failures + " time(s).", (Throwable) e);
        }
        if (this.failures.get(execution.getAppId()) == null || this.failures.get(execution.getAppId()).intValue() <= this.maxStatusPollRetry) {
            return execution;
        }
        try {
            LOG.log(Level.SEVERE, "Killing application, {0}, because unable to poll for status.", execution);
            yarnMonitor.cancelJob(yarnMonitor.getApplicationId().toString());
            execution = updateProgress(0.0f, updateFinalStatus(JobFinalStatus.KILLED, updateState(JobState.KILLED, execution)));
            this.execFinalizer.finalize(execution, JobState.KILLED);
            return null;
        } catch (YarnException | IOException e2) {
            LOG.log(Level.SEVERE, "Failed to cancel execution, " + execution + " after failing to poll for status.", e2);
            this.execFinalizer.finalize(updateState(JobState.FRAMEWORK_FAILURE, execution), JobState.FRAMEWORK_FAILURE);
            return null;
        }
    }

    private Execution updateProgress(float f, Execution execution) {
        return this.executionFacade.updateProgress(execution, f);
    }

    private Execution updateState(JobState jobState, Execution execution) {
        return this.executionFacade.updateState(execution, jobState);
    }

    private Execution updateFinalStatus(JobFinalStatus jobFinalStatus, Execution execution) {
        return this.executionFacade.updateFinalStatus(execution, jobFinalStatus);
    }
}
