/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.dao.jobhistory.ExecutionFacade;
import io.hops.hopsworks.common.jobs.JobsMonitor;
import io.hops.hopsworks.common.jobs.execution.ExecutionUpdateController;
import io.hops.hopsworks.common.jobs.yarn.YarnExecutionFinalizer;
import io.hops.hopsworks.common.jobs.yarn.YarnMonitor;
import io.hops.hopsworks.common.util.PayaraClusterManager;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.yarn.YarnClientService;
import io.hops.hopsworks.common.yarn.YarnClientWrapper;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobFinalStatus;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;

@Startup
@Singleton
@DependsOn(value={"Settings"})
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class YarnJobsMonitor
implements JobsMonitor {
    private static final Logger LOGGER = Logger.getLogger(YarnJobsMonitor.class.getName());
    @EJB
    private Settings settings;
    @EJB
    private ExecutionFacade executionFacade;
    @EJB
    private ExecutionUpdateController executionUpdateController;
    @EJB
    private YarnExecutionFinalizer execFinalizer;
    @EJB
    private YarnClientService ycs;
    @EJB
    private PayaraClusterManager payaraClusterManager;
    @Resource
    private TimerService timerService;
    private Timer timer;
    private int maxStatusPollRetry;
    Map<String, YarnMonitor> monitors = new HashMap<String, YarnMonitor>();
    Map<String, Integer> failures = new HashMap<String, Integer>();
    private final Map<ApplicationId, Future<Execution>> copyLogsFutures = new HashMap<ApplicationId, Future<Execution>>();

    @PostConstruct
    public void init() {
        long intervalDuration = 5000L;
        this.timer = this.timerService.createIntervalTimer(0L, intervalDuration, new TimerConfig((Serializable)((Object)"Yarn job monitor timer"), false));
    }

    @PreDestroy
    public void destroy() {
        if (this.timer != null) {
            this.timer.cancel();
        }
    }

    @Timeout
    public synchronized void yarnJobMonitor(Timer timer) {
        if (!this.payaraClusterManager.amIThePrimary()) {
            return;
        }
        try {
            HashMap<String, Execution> executions = new HashMap<String, Execution>();
            List<Execution> execs = this.executionFacade.findNotFinished();
            if (execs != null && !execs.isEmpty()) {
                for (Execution exec : execs) {
                    if (exec.getAppId() == null) continue;
                    executions.put(exec.getAppId(), exec);
                }
                Iterator<Map.Entry<String, YarnMonitor>> monitorsIter = this.monitors.entrySet().iterator();
                while (monitorsIter.hasNext()) {
                    Map.Entry<String, YarnMonitor> entry = monitorsIter.next();
                    if (executions.keySet().contains(entry.getKey())) continue;
                    entry.getValue().close();
                    monitorsIter.remove();
                }
                this.maxStatusPollRetry = this.settings.getMaxStatusPollRetry();
                ArrayList toRemove = new ArrayList();
                for (Map.Entry entry : executions.entrySet()) {
                    Execution exec;
                    YarnMonitor monitor = this.monitors.get(entry.getKey());
                    if (monitor == null) {
                        ApplicationId appId = ApplicationId.fromString((String)((String)entry.getKey()));
                        YarnClientWrapper newYarnclientWrapper = this.ycs.getYarnClientSuper(this.settings.getConfiguration());
                        monitor = new YarnMonitor(appId, newYarnclientWrapper, this.ycs);
                        this.monitors.put((String)entry.getKey(), monitor);
                    }
                    if ((exec = this.internalMonitor((Execution)executions.get(entry.getKey()), monitor)) != null) continue;
                    toRemove.add(entry.getKey());
                    monitor.close();
                }
                for (String appID : toRemove) {
                    this.failures.remove(appID);
                    this.monitors.remove(appID);
                }
                this.copyLogsFutures.entrySet().removeIf(futureResult -> ((Future)futureResult.getValue()).isDone());
            }
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, "Error while monitoring jobs", ex);
        }
    }

    private Execution internalMonitor(Execution exec, YarnMonitor monitor) {
        try {
            YarnApplicationState appState = monitor.getApplicationState();
            FinalApplicationStatus finalAppStatus = monitor.getFinalApplicationStatus();
            float progress = monitor.getProgress();
            exec = this.updateProgress(progress, exec);
            exec = this.updateState(JobState.getJobState((YarnApplicationState)appState), exec);
            exec = this.updateFinalStatus(JobFinalStatus.getJobFinalStatus((FinalApplicationStatus)finalAppStatus), exec);
            if (!(appState != YarnApplicationState.FAILED && appState != YarnApplicationState.FINISHED && appState != YarnApplicationState.KILLED || this.copyLogsFutures.containsKey(monitor.getApplicationId()))) {
                exec = this.executionFacade.updateState(exec, JobState.AGGREGATING_LOGS);
                Future<Execution> futureResult = this.execFinalizer.copyLogs(exec);
                this.copyLogsFutures.put(monitor.getApplicationId(), futureResult);
                return null;
            }
        }
        catch (IOException | YarnException ex) {
            Integer failure = this.failures.get(exec.getAppId());
            if (failure == null) {
                failure = 1;
            } else {
                Integer n = failure;
                Integer n2 = failure = Integer.valueOf(failure + 1);
            }
            this.failures.put(exec.getAppId(), failure);
            LOGGER.log(Level.WARNING, "Failed to get application state for execution " + exec + ". Tried " + this.failures + " time(s).", ex);
        }
        if (this.failures.get(exec.getAppId()) != null && this.failures.get(exec.getAppId()) > this.maxStatusPollRetry) {
            try {
                LOGGER.log(Level.SEVERE, "Killing application, {0}, because unable to poll for status.", exec);
                monitor.cancelJob(monitor.getApplicationId().toString());
                exec = this.updateFinalStatus(JobFinalStatus.KILLED, exec);
                exec = this.updateProgress(0.0f, exec);
                this.execFinalizer.finalize(exec, JobState.KILLED);
            }
            catch (IOException | YarnException ex) {
                LOGGER.log(Level.SEVERE, "Failed to cancel execution, " + exec + " after failing to poll for status.", ex);
                this.execFinalizer.finalize(exec, JobState.FRAMEWORK_FAILURE);
            }
            return null;
        }
        return exec;
    }

    @Override
    public Execution updateProgress(float progress, Execution execution) {
        return this.executionUpdateController.updateProgress(progress, execution);
    }

    @Override
    public Execution updateState(JobState newState, Execution execution) {
        return this.executionUpdateController.updateState(newState, execution);
    }

    private Execution updateFinalStatus(JobFinalStatus finalStatus, Execution execution) {
        return this.executionUpdateController.updateFinalStatusAndSendAlert(finalStatus, execution);
    }
}

