/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import io.hops.hopsworks.common.jobs.flink.AbstractYarnClusterDescriptor;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.ApplicationClient;
import org.apache.flink.yarn.YarnMessages;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class YarnClusterClient
extends ClusterClient {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
    private static final int POLLING_THREAD_INTERVAL_MS = 1000;
    private YarnClient yarnClient;
    private Thread clientShutdownHook = new ClientShutdownHook();
    private PollingThread pollingRunner;
    private final org.apache.hadoop.conf.Configuration hadoopConfig;
    private final Path sessionFilesDir;
    private final AbstractYarnClusterDescriptor clusterDescriptor;
    private final LazApplicationClientLoader applicationClient;
    private final FiniteDuration akkaDuration;
    private final ApplicationReport appReport;
    private final ApplicationId appId;
    private final String trackingURL;
    private boolean isConnected = true;
    private final boolean newlyCreatedCluster;
    private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);

    public YarnClusterClient(AbstractYarnClusterDescriptor clusterDescriptor, YarnClient yarnClient, ApplicationReport appReport, Configuration flinkConfig, Path sessionFilesDir, boolean newlyCreatedCluster) throws IOException, YarnException {
        super(flinkConfig);
        this.akkaDuration = AkkaUtils.getTimeout((Configuration)flinkConfig);
        this.clusterDescriptor = clusterDescriptor;
        this.yarnClient = yarnClient;
        this.hadoopConfig = yarnClient.getConfig();
        this.sessionFilesDir = sessionFilesDir;
        this.appReport = appReport;
        this.appId = appReport.getApplicationId();
        this.trackingURL = appReport.getTrackingUrl();
        this.newlyCreatedCluster = newlyCreatedCluster;
        this.applicationClient = new LazApplicationClientLoader(flinkConfig, this.actorSystemLoader);
        this.pollingRunner = new PollingThread(yarnClient, this.appId);
        this.pollingRunner.setDaemon(true);
        this.pollingRunner.start();
        Runtime.getRuntime().addShutdownHook(this.clientShutdownHook);
    }

    public void disconnect() {
        if (this.hasBeenShutDown.getAndSet(true)) {
            return;
        }
        if (!this.isConnected) {
            throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
        }
        LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
        try {
            Runtime.getRuntime().removeShutdownHook(this.clientShutdownHook);
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            this.pollingRunner.stopRunner();
            this.pollingRunner.join(1000L);
        }
        catch (InterruptedException e) {
            LOG.warn("Shutdown of the polling runner was interrupted", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        this.isConnected = false;
    }

    public void stopAfterJob(JobID jobID) {
        Preconditions.checkNotNull((Object)jobID, (String)"The job id must not be null");
        try {
            Future replyFuture = this.getJobManagerGateway().ask((Object)new ShutdownClusterAfterJob(jobID), this.akkaDuration);
            Await.ready((Awaitable)replyFuture, (Duration)this.akkaDuration);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
        }
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig;
    }

    public int getMaxSlots() {
        int maxSlots = this.clusterDescriptor.getTaskManagerCount() * this.clusterDescriptor.getTaskManagerSlots();
        return maxSlots > 0 ? maxSlots : -1;
    }

    protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (this.isDetached()) {
            if (this.newlyCreatedCluster) {
                this.stopAfterJob(jobGraph.getJobID());
            }
            return super.runDetached(jobGraph, classLoader);
        }
        return super.run(jobGraph, classLoader);
    }

    public String getWebInterfaceURL() {
        if (!this.trackingURL.startsWith("http://")) {
            return "http://" + this.trackingURL;
        }
        return this.trackingURL;
    }

    public String getClusterIdentifier() {
        return "Yarn cluster with application id " + this.appReport.getApplicationId();
    }

    public GetClusterStatusResponse getClusterStatus() {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster is not connected to the cluster.");
        }
        if (this.hasBeenShutdown()) {
            throw new IllegalStateException("The cluster has already been shutdown.");
        }
        try {
            Future clusterStatusOption = this.getJobManagerGateway().ask((Object)GetClusterStatus.getInstance(), this.akkaDuration);
            return (GetClusterStatusResponse)Await.result((Awaitable)clusterStatusOption, (Duration)this.akkaDuration);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to get ClusterClient status from Application Client", e);
        }
    }

    public ApplicationStatus getApplicationStatus() {
        ApplicationStatus status;
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        ApplicationReport lastReport = null;
        if (this.pollingRunner == null) {
            LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized cluster.The system might be in an erroneous state");
        } else {
            lastReport = this.pollingRunner.getLastReport();
        }
        if (lastReport == null) {
            LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't receive a status so far.The system might be in an erroneous state");
            return ApplicationStatus.UNKNOWN;
        }
        YarnApplicationState appState = lastReport.getYarnApplicationState();
        ApplicationStatus applicationStatus = status = appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
        if (status != ApplicationStatus.SUCCEEDED) {
            LOG.warn("YARN reported application state {}", (Object)appState);
            LOG.warn("Diagnostics: {}", (Object)lastReport.getDiagnostics());
        }
        return status;
    }

    public List<String> getNewMessages() {
        if (this.hasBeenShutdown()) {
            throw new RuntimeException("The YarnClusterClient has already been stopped");
        }
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        ArrayList<String> ret = new ArrayList<String>();
        while (true) {
            Object result;
            try {
                Future response = Patterns.ask((ActorRef)this.applicationClient.get(), (Object)YarnMessages.getLocalGetYarnMessage(), (Timeout)new Timeout(this.akkaDuration));
                result = Await.result((Awaitable)response, (Duration)this.akkaDuration);
            }
            catch (Exception ioe) {
                LOG.warn("Error retrieving the YARN messages locally", (Throwable)ioe);
                break;
            }
            if (!(result instanceof Option)) {
                throw new RuntimeException("LocalGetYarnMessage requires a response of type Option. Instead the response is of type " + result.getClass() + ".");
            }
            Option messageOption = (Option)result;
            LOG.debug("Received message option {}", (Object)messageOption);
            if (messageOption.isEmpty()) break;
            Object obj = messageOption.get();
            if (obj instanceof InfoMessage) {
                InfoMessage msg = (InfoMessage)obj;
                ret.add("[" + msg.date() + "] " + msg.message());
                continue;
            }
            LOG.warn("LocalGetYarnMessage returned unexpected type: " + messageOption);
        }
        return ret;
    }

    public void finalizeCluster() {
        if (this.isDetached() || !this.newlyCreatedCluster) {
            this.disconnect();
        } else {
            this.shutdownCluster();
        }
    }

    public void shutdownCluster() {
        if (this.hasBeenShutDown.getAndSet(true)) {
            return;
        }
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
        }
        try {
            Runtime.getRuntime().removeShutdownHook(this.clientShutdownHook);
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        LOG.info("Sending shutdown request to the Application Master");
        try {
            Future response = Patterns.ask((ActorRef)this.applicationClient.get(), (Object)new YarnMessages.LocalStopYarnSession(this.getApplicationStatus(), "Flink YARN Client requested shutdown"), (Timeout)new Timeout(this.akkaDuration));
            Await.ready((Awaitable)response, (Duration)this.akkaDuration);
        }
        catch (Exception e) {
            LOG.warn("Error while stopping YARN cluster.", (Throwable)e);
        }
        try {
            File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation((Configuration)this.flinkConfig);
            if (propertiesFile.isFile()) {
                if (propertiesFile.delete()) {
                    LOG.info("Deleted Yarn properties file at {}", (Object)propertiesFile.getAbsoluteFile().toString());
                } else {
                    LOG.warn("Couldn't delete Yarn properties file at {}", (Object)propertiesFile.getAbsoluteFile().toString());
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while deleting the JobManager address file", (Throwable)e);
        }
        if (this.sessionFilesDir != null) {
            LOG.info("Deleting files in " + this.sessionFilesDir);
            try {
                FileSystem shutFS = FileSystem.get((org.apache.hadoop.conf.Configuration)this.hadoopConfig);
                shutFS.delete(this.sessionFilesDir, true);
                shutFS.close();
            }
            catch (IOException e) {
                LOG.error("Could not delete the Flink jar and configuration files in HDFS..", (Throwable)e);
            }
        } else {
            LOG.warn("Session file directory not set. Not deleting session files");
        }
        try {
            this.pollingRunner.stopRunner();
            this.pollingRunner.join(1000L);
        }
        catch (InterruptedException e) {
            LOG.warn("Shutdown of the polling runner was interrupted", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        try {
            ApplicationReport appReport = this.yarnClient.getApplicationReport(this.appId);
            LOG.info("Application " + this.appId + " finished with state " + appReport.getYarnApplicationState() + " and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
            if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
                LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
                LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command:\n\tyarn logs -appReport " + appReport.getApplicationId() + "\n(It sometimes takes a few seconds until the logs are aggregated)");
            }
        }
        catch (Exception e) {
            LOG.warn("Couldn't get final report", (Throwable)e);
        }
        LOG.info("YARN Client is shutting down");
        this.yarnClient.stop();
        this.yarnClient = null;
    }

    public boolean hasBeenShutdown() {
        return this.hasBeenShutDown.get();
    }

    public boolean isDetached() {
        return super.isDetached() || this.clusterDescriptor.isDetachedMode();
    }

    public void waitForClusterToBeReady() {
        this.logAndSysout("Waiting until all TaskManagers have connected");
        GetClusterStatusResponse lastStatus = null;
        while (true) {
            GetClusterStatusResponse currentStatus;
            if ((currentStatus = this.getClusterStatus()) != null && !currentStatus.equals(lastStatus)) {
                this.logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/" + this.clusterDescriptor.getTaskManagerCount() + ")");
                if (currentStatus.numRegisteredTaskManagers() >= this.clusterDescriptor.getTaskManagerCount()) {
                    break;
                }
            } else if (lastStatus == null) {
                this.logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
            }
            try {
                Thread.sleep(250L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
            }
            lastStatus = currentStatus;
        }
        this.logAndSysout("All TaskManagers are connected");
    }

    public ApplicationId getApplicationId() {
        return this.appId;
    }

    private static class LazApplicationClientLoader {
        private final Configuration flinkConfig;
        private final ClusterClient.LazyActorSystemLoader actorSystemLoader;
        private ActorRef applicationClient;

        private LazApplicationClientLoader(Configuration flinkConfig, ClusterClient.LazyActorSystemLoader actorSystemLoader) {
            this.flinkConfig = flinkConfig;
            this.actorSystemLoader = actorSystemLoader;
        }

        public ActorRef get() {
            if (this.applicationClient == null) {
                LeaderRetrievalService leaderRetrievalService;
                try {
                    leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService((Configuration)this.flinkConfig);
                }
                catch (Exception e) {
                    throw new RuntimeException("Could not create the leader retrieval service.", e);
                }
                LOG.info("Start application client.");
                this.applicationClient = this.actorSystemLoader.get().actorOf(Props.create(ApplicationClient.class, (Object[])new Object[]{this.flinkConfig, leaderRetrievalService}), "applicationClient");
            }
            return this.applicationClient;
        }
    }

    private static class PollingThread
    extends Thread {
        AtomicBoolean running = new AtomicBoolean(true);
        private YarnClient yarnClient;
        private ApplicationId appId;
        private final Object lock = new Object();
        private ApplicationReport lastReport;

        public PollingThread(YarnClient yarnClient, ApplicationId appId) {
            this.yarnClient = yarnClient;
            this.appId = appId;
        }

        public void stopRunner() {
            if (!this.running.get()) {
                LOG.warn("Polling thread was already stopped");
            }
            this.running.set(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ApplicationReport getLastReport() {
            Object object = this.lock;
            synchronized (object) {
                return this.lastReport;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (this.running.get() && this.yarnClient.isInState(Service.STATE.STARTED)) {
                try {
                    ApplicationReport report = this.yarnClient.getApplicationReport(this.appId);
                    Object object = this.lock;
                    synchronized (object) {
                        this.lastReport = report;
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while getting application report", (Throwable)e);
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    LOG.error("Polling thread got interrupted", (Throwable)e);
                    Thread.currentThread().interrupt();
                    this.stopRunner();
                }
            }
            if (this.running.get() && !this.yarnClient.isInState(Service.STATE.STARTED)) {
                LOG.warn("YARN client is unexpected in state " + this.yarnClient.getServiceState());
            }
        }
    }

    private class ClientShutdownHook
    extends Thread {
        private ClientShutdownHook() {
        }

        @Override
        public void run() {
            LOG.info("Shutting down YarnClusterClient from the client shutdown hook");
            YarnClusterClient.this.shutdown();
        }
    }
}

