/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.jobs.flink;

import io.hops.hopsworks.common.dao.project.Project;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.configuration.JobType;
import io.hops.hopsworks.common.jobs.flink.YarnClusterClient;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.util.HopsUtils;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractYarnClusterDescriptor
implements ClusterDescriptor<YarnClusterClient> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
    private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
    private static final int MIN_JM_MEMORY = 768;
    private static final int MIN_TM_MEMORY = 768;
    private org.apache.hadoop.conf.Configuration conf = null;
    private Path sessionFilesDir;
    private int slots = -1;
    private int jobManagerMemoryMb = 1024;
    private int taskManagerMemoryMb = 1024;
    private int taskManagerCount = 1;
    private String yarnQueue;
    private String configurationDirectory;
    private Path flinkConfigurationPath;
    private Path flinkJarPath;
    private String dynamicPropertiesEncoded;
    protected List<File> shipFiles = new LinkedList<File>();
    private Configuration flinkConfiguration;
    private boolean detached;
    private String customName;
    private String zookeeperNamespace;
    List<Path> hopsLocalResources = new ArrayList<Path>();
    List<String> hopsworksParams = new ArrayList<String>();
    Map<String, LocalResource> hopsworksResources = new HashMap<String, LocalResource>();
    private AsynchronousJobExecutor services;
    private Project project;
    private String username;
    private List<String> javaOptions;

    public List<Path> getHopsLocalResources() {
        return this.hopsLocalResources;
    }

    public void setHopsLocalResources(List<Path> hopsLocalResources) {
        this.hopsLocalResources = hopsLocalResources;
    }

    public List<String> getHopsworksParams() {
        return this.hopsworksParams;
    }

    public void setHopsworksParams(List<String> hopsworksParams) {
        this.hopsworksParams = hopsworksParams;
    }

    public void addHopsworksParam(String hopsworksParam) {
        this.hopsworksParams.add(hopsworksParam);
    }

    public Map<String, LocalResource> getHopsworksResources() {
        return this.hopsworksResources;
    }

    public void setHopsworksResources(Map<String, LocalResource> hopsworksResources) {
        this.hopsworksResources = hopsworksResources;
    }

    public void addHopsworksResource(String name, LocalResource resource) {
        this.hopsworksResources.put(name, resource);
    }

    public AbstractYarnClusterDescriptor() {
        if (System.getenv("IN_TESTS") != null) {
            try {
                this.conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
            }
            catch (Throwable t) {
                throw new RuntimeException("Error", t);
            }
        }
        try {
            this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
            GlobalConfiguration.loadConfiguration((String)this.configurationDirectory);
            this.flinkConfiguration = GlobalConfiguration.getConfiguration();
            File confFile = new File(this.configurationDirectory + File.separator + CONFIG_FILE_NAME);
            if (!confFile.exists()) {
                throw new RuntimeException("Unable to locate configuration file in " + confFile);
            }
            this.flinkConfigurationPath = new Path(confFile.getAbsolutePath());
        }
        catch (Exception e) {
            LOG.debug("Config couldn't be loaded from environment variable.");
        }
    }

    protected abstract Class<?> getApplicationMasterClass();

    public void setJobManagerMemory(int memoryMb) {
        if (memoryMb < 768) {
            throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount of " + 768 + " MB");
        }
        this.jobManagerMemoryMb = memoryMb;
    }

    public void setTaskManagerMemory(int memoryMb) {
        if (memoryMb < 768) {
            throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount of " + 768 + " MB");
        }
        this.taskManagerMemoryMb = memoryMb;
    }

    public void setFlinkConfiguration(Configuration conf) {
        this.flinkConfiguration = conf;
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfiguration;
    }

    public void setTaskManagerSlots(int slots) {
        if (slots <= 0) {
            throw new IllegalArgumentException("Number of TaskManager slots must be positive");
        }
        this.slots = slots;
    }

    public int getTaskManagerSlots() {
        return this.slots;
    }

    public void setQueue(String queue) {
        this.yarnQueue = queue;
    }

    public void setLocalJarPath(Path localJarPath) {
        if (!localJarPath.toString().endsWith("jar")) {
            throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
        }
        this.flinkJarPath = localJarPath;
    }

    public void setConfigurationFilePath(Path confPath) {
        this.flinkConfigurationPath = confPath;
    }

    public void setConfigurationDirectory(String configurationDirectory) {
        this.configurationDirectory = configurationDirectory;
    }

    public void setTaskManagerCount(int tmCount) {
        if (tmCount < 1) {
            throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
        }
        this.taskManagerCount = tmCount;
    }

    public int getTaskManagerCount() {
        return this.taskManagerCount;
    }

    public void addShipFiles(List<File> shipFiles) {
        for (File shipFile : shipFiles) {
            if (shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar")) continue;
            this.shipFiles.add(shipFile);
        }
    }

    public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
        this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
    }

    public String getDynamicPropertiesEncoded() {
        return this.dynamicPropertiesEncoded;
    }

    private void isReadyForDeployment() throws YarnDeploymentException {
        if (this.taskManagerCount <= 0) {
            throw new YarnDeploymentException("Taskmanager count must be positive");
        }
        if (this.flinkJarPath == null) {
            throw new YarnDeploymentException("The Flink jar path is null");
        }
        if (this.configurationDirectory == null) {
            throw new YarnDeploymentException("Configuration directory not set");
        }
        if (this.flinkConfigurationPath == null) {
            throw new YarnDeploymentException("Configuration path not set");
        }
        if (this.flinkConfiguration == null) {
            throw new YarnDeploymentException("Flink configuration object has not been set");
        }
        if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
            LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
        }
    }

    private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
        for (int i = 0; i < nodeManagers.length; ++i) {
            if (nodeManagers[i] < toAllocate) continue;
            int n = i;
            nodeManagers[n] = nodeManagers[n] - toAllocate;
            return true;
        }
        return false;
    }

    public void setDetachedMode(boolean detachedMode) {
        this.detached = detachedMode;
    }

    public boolean isDetachedMode() {
        return this.detached;
    }

    public String getZookeeperNamespace() {
        return this.zookeeperNamespace;
    }

    public void setZookeeperNamespace(String zookeeperNamespace) {
        this.zookeeperNamespace = zookeeperNamespace;
    }

    protected YarnClient getYarnClient() {
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(this.conf);
        yarnClient.start();
        return yarnClient;
    }

    public YarnClusterClient retrieve(String applicationID) {
        try {
            if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
                LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
            }
            ApplicationId yarnAppId = ConverterUtils.toApplicationId((String)applicationID);
            YarnClient yarnClient = this.getYarnClient();
            ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
            if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}", (Object)applicationID, (Object)appReport.getFinalApplicationStatus());
                throw new RuntimeException("The Yarn application " + applicationID + " doesn't run anymore.");
            }
            LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'", new Object[]{appReport.getHost(), appReport.getRpcPort(), applicationID});
            this.flinkConfiguration.setString("jobmanager.rpc.address", appReport.getHost());
            this.flinkConfiguration.setInteger("jobmanager.rpc.port", appReport.getRpcPort());
            return this.createYarnClusterClient(this, yarnClient, appReport, this.flinkConfiguration, this.sessionFilesDir, false);
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
        }
    }

    public void setCertsObjects(AsynchronousJobExecutor services, Project project, String username, List<String> javaOptions) {
        this.services = services;
        this.project = project;
        this.username = username;
        this.javaOptions = javaOptions;
    }

    public YarnClusterClient deploy() {
        try {
            if (this.services == null || this.project == null || this.username == null || this.javaOptions == null) {
                throw new InvalidObjectException("Necessary objects for materializing user certificates are null");
            }
            return this.deployInternal();
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't deploy Yarn cluster", e);
        }
    }

    protected YarnClusterClient deployInternal() throws Exception {
        ApplicationReport report;
        String name;
        File log4jFile;
        boolean hasLog4j;
        int[] nmFree;
        int n;
        YarnClient yarnClient;
        block50: {
            this.isReadyForDeployment();
            LOG.info("Using values:");
            LOG.info("\tTaskManager count = {}", (Object)this.taskManagerCount);
            LOG.info("\tJobManager memory = {}", (Object)this.jobManagerMemoryMb);
            LOG.info("\tTaskManager memory = {}", (Object)this.taskManagerMemoryMb);
            yarnClient = this.getYarnClient();
            try {
                List queues = yarnClient.getAllQueues();
                if (queues.size() > 0 && this.yarnQueue != null) {
                    boolean queueFound = false;
                    for (Object queue : queues) {
                        if (!queue.getQueueName().equals(this.yarnQueue)) continue;
                        queueFound = true;
                        break;
                    }
                    if (!queueFound) {
                        String queueNames = "";
                        for (Object queue : queues) {
                            queueNames = queueNames + queue.getQueueName() + ", ";
                        }
                        LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. Available queues: " + queueNames);
                    }
                } else {
                    LOG.debug("The YARN cluster does not have any queues configured");
                }
            }
            catch (Throwable e) {
                LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
                if (!LOG.isDebugEnabled()) break block50;
                LOG.debug("Error details", e);
            }
        }
        YarnClientApplication yarnApplication = yarnClient.createApplication();
        GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
        HashMap<String, String> jobSystemProperties = new HashMap<String, String>(3);
        if (!this.services.getSettings().getHopsRpcTls()) {
            HopsUtils.copyProjectUserCerts(this.project, this.username, this.services.getSettings().getHopsworksTmpCertDir(), this.services.getSettings().getHdfsTmpCertDir(), JobType.FLINK, null, null, jobSystemProperties, this.services.getSettings().getFlinkKafkaCertDir(), appResponse.getApplicationId().toString(), this.services.getCertificateMaterializer(), this.services.getSettings().getHopsRpcTls());
        }
        StringBuilder tmpBuilder = new StringBuilder();
        for (Map.Entry entry : jobSystemProperties.entrySet()) {
            String string = YarnRunner.escapeForShell("-D" + (String)entry.getKey() + "=" + (String)entry.getValue());
            this.javaOptions.add(string);
            this.addHopsworksParam(string);
            tmpBuilder.append((String)entry.getKey()).append("=").append((String)entry.getValue()).append("@@");
        }
        this.dynamicPropertiesEncoded = this.dynamicPropertiesEncoded + tmpBuilder.toString();
        Map dynProperties = FlinkYarnSessionCli.getDynamicProperties((String)this.dynamicPropertiesEncoded);
        for (Map.Entry entry : dynProperties.entrySet()) {
            this.flinkConfiguration.setString((String)entry.getKey(), (String)entry.getValue());
        }
        try {
            FileSystem.setDefaultScheme((Configuration)this.flinkConfiguration);
        }
        catch (IOException iOException) {
            throw new IOException("Error while setting the default filesystem scheme from configuration.", iOException);
        }
        org.apache.hadoop.fs.FileSystem fileSystem = org.apache.hadoop.fs.FileSystem.get((org.apache.hadoop.conf.Configuration)this.conf);
        if (!fileSystem.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && fileSystem.getScheme().startsWith("file")) {
            LOG.warn("The file system scheme is '" + fileSystem.getScheme() + "'. This indicates that the specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values. The Flink YARN client needs to store its files in a distributed file system");
        }
        if (this.jobManagerMemoryMb < (n = this.conf.getInt("yarn.scheduler.minimum-allocation-mb", 0)) || this.taskManagerMemoryMb < n) {
            LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is " + n + "'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.");
        }
        if (this.jobManagerMemoryMb < n) {
            this.jobManagerMemoryMb = n;
        }
        if (this.taskManagerMemoryMb < n) {
            this.taskManagerMemoryMb = n;
        }
        Resource maxRes = appResponse.getMaximumResourceCapability();
        String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
        if (this.jobManagerMemoryMb > maxRes.getMemory()) {
            this.failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\nMaximum Memory: " + maxRes.getMemory() + "MB Requested: " + this.jobManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        if (this.taskManagerMemoryMb > maxRes.getMemory()) {
            this.failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\nMaximum Memory: " + maxRes.getMemory() + " Requested: " + this.taskManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.";
        int totalMemoryRequired = this.jobManagerMemoryMb + this.taskManagerMemoryMb * this.taskManagerCount;
        ClusterResourceDescription freeClusterMem = this.getCurrentFreeClusterResources(yarnClient);
        if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
            LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (this.taskManagerMemoryMb > freeClusterMem.containerLimit) {
            LOG.warn("The requested amount of memory for the TaskManagers (" + this.taskManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterMem.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (this.jobManagerMemoryMb > freeClusterMem.containerLimit) {
            LOG.warn("The requested amount of memory for the JobManager (" + this.jobManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterMem.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (!AbstractYarnClusterDescriptor.allocateResource(nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length), this.jobManagerMemoryMb)) {
            LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. The JobManager requires " + this.jobManagerMemoryMb + "MB. NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        for (int i = 0; i < this.taskManagerCount; ++i) {
            if (AbstractYarnClusterDescriptor.allocateResource(nmFree, this.taskManagerMemoryMb)) continue;
            LOG.warn("There is not enough memory available in the YARN cluster. The TaskManager(s) require " + this.taskManagerMemoryMb + "MB each. NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\nAfter allocating the JobManager (" + this.jobManagerMemoryMb + "MB) and (" + i + "/" + this.taskManagerCount + ") TaskManagers, the following NodeManagers are available: " + Arrays.toString(nmFree) + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        HashSet<File> effectiveShipFiles = new HashSet<File>(this.shipFiles.size());
        for (File file : this.shipFiles) {
            effectiveShipFiles.add(file.getAbsoluteFile());
        }
        File logbackFile = new File(this.configurationDirectory + File.separator + "logback.xml");
        boolean hasLogback = logbackFile.exists();
        if (hasLogback) {
            effectiveShipFiles.add(logbackFile);
        }
        if (hasLog4j = (log4jFile = new File(this.configurationDirectory + File.separator + "log4j.properties")).exists()) {
            effectiveShipFiles.add(log4jFile);
            if (hasLogback) {
                LOG.warn("The configuration directory ('" + this.configurationDirectory + "') contains both LOG4J and Logback configuration files. Please delete or rename one of them.");
            }
        }
        this.addLibFolderToShipFiles(effectiveShipFiles);
        ContainerLaunchContext amContainer = this.setupApplicationMasterContainer(hasLogback, hasLog4j);
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
        ApplicationId appId = appContext.getApplicationId();
        String zkNamespace = this.getZookeeperNamespace();
        if (zkNamespace == null || zkNamespace.isEmpty()) {
            zkNamespace = this.flinkConfiguration.getString("recovery.zookeeper.path.namespace", String.valueOf(appId));
            this.setZookeeperNamespace(zkNamespace);
        }
        this.flinkConfiguration.setString("recovery.zookeeper.path.namespace", zkNamespace);
        if (RecoveryMode.isHighAvailabilityModeActivated((Configuration)this.flinkConfiguration)) {
            appContext.setMaxAppAttempts(this.flinkConfiguration.getInteger("yarn.application-attempts", 2));
            this.activateHighAvailabilitySupport(appContext);
        } else {
            appContext.setMaxAppAttempts(this.flinkConfiguration.getInteger("yarn.application-attempts", 1));
        }
        HashMap<String, Object> localResources = new HashMap<String, Object>(2 + effectiveShipFiles.size());
        ArrayList<Path> paths = new ArrayList<Path>(2 + effectiveShipFiles.size());
        StringBuilder classPathBuilder = new StringBuilder();
        StringBuilder envShipFileList = new StringBuilder();
        for (File file : effectiveShipFiles) {
            LocalResource shipResources = (LocalResource)Records.newRecord(LocalResource.class);
            Path shipLocalPath = new Path("file://" + file.getAbsolutePath());
            Path remotePath = Utils.setupLocalResource((org.apache.hadoop.fs.FileSystem)fileSystem, (String)appId.toString(), (Path)shipLocalPath, (LocalResource)shipResources, (Path)fileSystem.getHomeDirectory());
            paths.add(remotePath);
            localResources.put(file.getName(), shipResources);
            classPathBuilder.append(file.getName());
            if (file.isDirectory()) {
                classPathBuilder.append(File.separator).append("*");
            }
            classPathBuilder.append(File.pathSeparator);
            envShipFileList.append(remotePath).append(",");
        }
        for (Map.Entry entry : this.hopsworksResources.entrySet()) {
            localResources.put((String)entry.getKey(), entry.getValue());
            classPathBuilder.append((String)entry.getKey());
            classPathBuilder.append(File.pathSeparator);
        }
        LocalResource appMasterJar = (LocalResource)Records.newRecord(LocalResource.class);
        LocalResource localResource = (LocalResource)Records.newRecord(LocalResource.class);
        Path remotePathJar = Utils.setupLocalResource((org.apache.hadoop.fs.FileSystem)fileSystem, (String)appId.toString(), (Path)this.flinkJarPath, (LocalResource)appMasterJar, (Path)fileSystem.getHomeDirectory());
        Path remotePathConf = Utils.setupLocalResource((org.apache.hadoop.fs.FileSystem)fileSystem, (String)appId.toString(), (Path)this.flinkConfigurationPath, (LocalResource)localResource, (Path)fileSystem.getHomeDirectory());
        localResources.put("flink.jar", appMasterJar);
        localResources.put(CONFIG_FILE_NAME, localResource);
        paths.add(remotePathJar);
        classPathBuilder.append("flink.jar").append(File.pathSeparator);
        paths.add(remotePathConf);
        classPathBuilder.append(CONFIG_FILE_NAME).append(File.pathSeparator);
        this.sessionFilesDir = new Path(fileSystem.getHomeDirectory(), ".flink/" + appId.toString() + "/");
        FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
        fileSystem.setPermission(this.sessionFilesDir, permission);
        Utils.setTokensFor((ContainerLaunchContext)amContainer, paths, (org.apache.hadoop.conf.Configuration)this.conf);
        amContainer.setLocalResources(localResources);
        fileSystem.close();
        HashMap<String, String> appMasterEnv = new HashMap<String, String>();
        appMasterEnv.putAll(Utils.getEnvironmentVariables((String)"yarn.application-master.env.", (Configuration)this.flinkConfiguration));
        appMasterEnv.put("_FLINK_CLASSPATH", classPathBuilder.toString());
        appMasterEnv.put("_CLIENT_TM_COUNT", String.valueOf(this.taskManagerCount));
        appMasterEnv.put("_CLIENT_TM_MEMORY", String.valueOf(this.taskManagerMemoryMb));
        appMasterEnv.put("_FLINK_JAR_PATH", remotePathJar.toString());
        appMasterEnv.put("_APP_ID", appId.toString());
        appMasterEnv.put("_CLIENT_HOME_DIR", fileSystem.getHomeDirectory().toString());
        appMasterEnv.put("_CLIENT_SHIP_FILES", envShipFileList.toString());
        appMasterEnv.put("_CLIENT_USERNAME", UserGroupInformation.getCurrentUser().getShortUserName());
        appMasterEnv.put("_SLOTS", String.valueOf(this.slots));
        appMasterEnv.put("_DETACHED", String.valueOf(this.detached));
        appMasterEnv.put("_ZOOKEEPER_NAMESPACE", this.getZookeeperNamespace());
        if (this.dynamicPropertiesEncoded != null) {
            appMasterEnv.put("_DYNAMIC_PROPERTIES", this.dynamicPropertiesEncoded);
        }
        Utils.setupYarnClassPath((org.apache.hadoop.conf.Configuration)this.conf, appMasterEnv);
        amContainer.setEnvironment(appMasterEnv);
        Resource capability = (Resource)Records.newRecord(Resource.class);
        capability.setMemory(this.jobManagerMemoryMb);
        capability.setVirtualCores(1);
        if (this.customName == null) {
            name = "Flink session with " + this.taskManagerCount + " TaskManagers";
            if (this.detached) {
                name = name + " (detached)";
            }
        } else {
            name = this.customName;
        }
        appContext.setApplicationName(name);
        appContext.setApplicationType("Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);
        if (this.yarnQueue != null) {
            appContext.setQueue(this.yarnQueue);
        }
        DeploymentFailureHook deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);
        LOG.info("Waiting for the cluster to be allocated");
        long startTime = System.currentTimeMillis();
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        block20: while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            }
            catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            switch (appState) {
                case FAILED: 
                case FINISHED: 
                case KILLED: {
                    throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + appState + " during deployment. \nDiagnostics from YARN: " + report.getDiagnostics() + "\nIf log aggregation is enabled on your cluster, use this command to further investigate the issue:\nyarn logs -applicationId " + appId);
                }
                case RUNNING: {
                    LOG.info("YARN application has been deployed successfully.");
                    break block20;
                }
                default: {
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000L) {
                        LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
                    lastAppState = appState;
                    Thread.sleep(250L);
                    continue block20;
                }
            }
            break;
        }
        if (this.isDetachedMode()) {
            LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + appId + "\nPlease also note that the temporary files of the YARN session in the home directoy will not be removed.");
        }
        try {
            Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
        }
        catch (IllegalStateException appState) {
            // empty catch block
        }
        String host = report.getHost();
        int port = report.getRpcPort();
        this.flinkConfiguration.setString("jobmanager.rpc.address", host);
        this.flinkConfiguration.setInteger("jobmanager.rpc.port", port);
        return this.createYarnClusterClient(this, yarnClient, report, this.flinkConfiguration, this.sessionFilesDir, true);
    }

    private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
        LOG.info("Killing YARN application");
        try {
            yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
        }
        catch (Exception e) {
            LOG.debug("Error while killing YARN application", (Throwable)e);
        }
        yarnClient.stop();
    }

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
        List nodes = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
        int totalFreeMemory = 0;
        int containerLimit = 0;
        int[] nodeManagersFree = new int[nodes.size()];
        for (int i = 0; i < nodes.size(); ++i) {
            int free;
            NodeReport rep = (NodeReport)nodes.get(i);
            nodeManagersFree[i] = free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
            totalFreeMemory += free;
            if (free <= containerLimit) continue;
            containerLimit = free;
        }
        return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
    }

    public String getClusterDescription() {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            YarnClient yarnClient = this.getYarnClient();
            YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
            ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
            List nodes = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
            String format = "|%-16s |%-16s %n";
            ps.printf("|Property         |Value          %n", new Object[0]);
            ps.println("+---------------------------------------+");
            int totalMemory = 0;
            int totalCores = 0;
            for (NodeReport rep : nodes) {
                Resource res = rep.getCapability();
                totalMemory += res.getMemory();
                totalCores += res.getVirtualCores();
                ps.format("|%-16s |%-16s %n", "NodeID", rep.getNodeId());
                ps.format("|%-16s |%-16s %n", "Memory", res.getMemory() + " MB");
                ps.format("|%-16s |%-16s %n", "vCores", res.getVirtualCores());
                ps.format("|%-16s |%-16s %n", "HealthReport", rep.getHealthReport());
                ps.format("|%-16s |%-16s %n", "Containers", rep.getNumContainers());
                ps.println("+---------------------------------------+");
            }
            ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
            List qInfo = yarnClient.getAllQueues();
            for (QueueInfo q : qInfo) {
                ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
            }
            yarnClient.stop();
            return baos.toString();
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't get cluster description", e);
        }
    }

    public String getSessionFilesDir() {
        return this.sessionFilesDir.toString();
    }

    public void setName(String name) {
        if (name == null) {
            throw new IllegalArgumentException("The passed name is null");
        }
        this.customName = name;
    }

    private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
        reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout((Configuration)this.flinkConfiguration).toMillis());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected void addLibFolderToShipFiles(Set<File> effectiveShipFiles) {
        String libDir = System.getenv().get("FLINK_LIB_DIR");
        if (libDir != null) {
            File libDirFile = new File(libDir);
            if (!libDirFile.isDirectory()) throw new YarnDeploymentException("The environment variable 'FLINK_LIB_DIR' is set to '" + libDir + "' but the directory doesn't exist.");
            effectiveShipFiles.add(libDirFile);
            return;
        } else {
            if (!this.shipFiles.isEmpty()) return;
            LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. Not shipping any library files.", (Object)"FLINK_LIB_DIR");
        }
    }

    protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) {
        String javaOpts = this.flinkConfiguration.getString("env.java.opts", "");
        ContainerLaunchContext amContainer = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        String amCommand = "$JAVA_HOME/bin/java -Xmx" + Utils.calculateHeapSize((int)this.jobManagerMemoryMb, (Configuration)this.flinkConfiguration) + "M " + javaOpts;
        if (hasLogback || hasLog4j) {
            amCommand = amCommand + " -Dlog.file=\"<LOG_DIR>/jobmanager.log\"";
            if (hasLogback) {
                amCommand = amCommand + " -Dlogback.configurationFile=file:logback.xml";
            }
            if (hasLog4j) {
                amCommand = amCommand + " -Dlog4j.configuration=file:log4j.properties";
            }
        }
        for (String envProperty : this.hopsworksParams) {
            amCommand = amCommand + " " + envProperty.replace("'", "");
        }
        amCommand = amCommand + " " + this.getApplicationMasterClass().getName() + "  1>" + "<LOG_DIR>" + "/jobmanager.out 2>" + "<LOG_DIR>" + "/jobmanager.err";
        amContainer.setCommands(Collections.singletonList(amCommand));
        LOG.debug("Application Master start command: " + amCommand);
        return amContainer;
    }

    protected YarnClusterClient createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, YarnClient yarnClient, ApplicationReport report, Configuration flinkConfiguration, Path sessionFilesDir, boolean perJobCluster) throws IOException, YarnException {
        return new YarnClusterClient(descriptor, yarnClient, report, flinkConfiguration, sessionFilesDir, perJobCluster);
    }

    public void setConf(org.apache.hadoop.conf.Configuration conf) {
        this.conf = conf;
    }

    private class DeploymentFailureHook
    extends Thread {
        private YarnClient yarnClient;
        private YarnClientApplication yarnApplication;

        DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication) {
            this.yarnClient = yarnClient;
            this.yarnApplication = yarnApplication;
        }

        @Override
        public void run() {
            LOG.info("Cancelling deployment from Deployment Failure Hook");
            AbstractYarnClusterDescriptor.this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            LOG.info("Deleting files in " + AbstractYarnClusterDescriptor.this.sessionFilesDir);
            try {
                org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get((org.apache.hadoop.conf.Configuration)AbstractYarnClusterDescriptor.this.conf);
                fs.delete(AbstractYarnClusterDescriptor.this.sessionFilesDir, true);
                fs.close();
            }
            catch (IOException e) {
                LOG.error("Failed to delete Flink Jar and conf files in HDFS", (Throwable)e);
            }
        }
    }

    private static class YarnDeploymentException
    extends RuntimeException {
        private static final long serialVersionUID = -812040641215388943L;

        public YarnDeploymentException(String message) {
            super(message);
        }

        public YarnDeploymentException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static class ApplicationSubmissionContextReflector {
        private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
        private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
        private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
        private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
        private final Method keepContainersMethod;
        private final Method attemptFailuresValidityIntervalMethod;

        public static ApplicationSubmissionContextReflector getInstance() {
            return instance;
        }

        private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
            Method attemptFailuresValidityIntervalMethod;
            Method keepContainersMethod;
            try {
                keepContainersMethod = clazz.getMethod(keepContainersMethodName, Boolean.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)keepContainersMethodName);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)keepContainersMethodName);
                keepContainersMethod = null;
            }
            this.keepContainersMethod = keepContainersMethod;
            try {
                attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, Long.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)attemptsFailuresValidityIntervalMethodName);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)attemptsFailuresValidityIntervalMethodName);
                attemptFailuresValidityIntervalMethod = null;
            }
            this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
        }

        public void setKeepContainersAcrossApplicationAttempts(ApplicationSubmissionContext appContext, boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
            if (this.keepContainersMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.keepContainersMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.keepContainersMethod.invoke((Object)appContext, keepContainers);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)keepContainersMethodName);
            }
        }

        public void setAttemptFailuresValidityInterval(ApplicationSubmissionContext appContext, long validityInterval) throws InvocationTargetException, IllegalAccessException {
            if (this.attemptFailuresValidityIntervalMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.attemptFailuresValidityIntervalMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.attemptFailuresValidityIntervalMethod.invoke((Object)appContext, validityInterval);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)attemptsFailuresValidityIntervalMethodName);
            }
        }
    }

    private static class ClusterResourceDescription {
        public final int totalFreeMemory;
        public final int containerLimit;
        public final int[] nodeManagersFree;

        public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
            this.totalFreeMemory = totalFreeMemory;
            this.containerLimit = containerLimit;
            this.nodeManagersFree = nodeManagersFree;
        }
    }
}

