/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractYarnClusterDescriptor
implements ClusterDescriptor<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnClusterDescriptor.class);
    private final YarnConfiguration yarnConfiguration;
    private final YarnClient yarnClient;
    private final boolean sharedYarnClient;
    private String yarnQueue;
    private String configurationDirectory;
    private org.apache.hadoop.fs.Path flinkJarPath;
    private String dynamicPropertiesEncoded;
    protected List<File> shipFiles = new LinkedList<File>();
    private final Configuration flinkConfiguration;
    private boolean detached;
    private String customName;
    private String zookeeperNamespace;
    private String nodeLabel;
    private final Set<File> userJarFiles = new HashSet<File>();
    private YarnConfigOptions.UserJarInclusion userJarInclusion;
    private org.apache.hadoop.fs.Path stagingDir;
    private YarnClientApplication yarnApplication;
    private GetNewApplicationResponse appResponse;

    public void setStagingDir(org.apache.hadoop.fs.Path stagingDir) {
        this.stagingDir = stagingDir;
    }

    public void setYarnApplication(YarnClientApplication yarnApplication) {
        this.yarnApplication = yarnApplication;
    }

    public void setAppResponse(GetNewApplicationResponse appResponse) {
        this.appResponse = appResponse;
    }

    public AbstractYarnClusterDescriptor(Configuration flinkConfiguration, YarnConfiguration yarnConfiguration, String configurationDirectory, YarnClient yarnClient, boolean sharedYarnClient) {
        this.yarnConfiguration = (YarnConfiguration)Preconditions.checkNotNull((Object)yarnConfiguration);
        if (System.getenv("IN_TESTS") != null) {
            try {
                yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml").toURI().toURL());
            }
            catch (Throwable t) {
                throw new RuntimeException("Error", t);
            }
        }
        this.yarnClient = (YarnClient)Preconditions.checkNotNull((Object)yarnClient);
        this.sharedYarnClient = sharedYarnClient;
        this.flinkConfiguration = (Configuration)Preconditions.checkNotNull((Object)flinkConfiguration);
        this.userJarInclusion = AbstractYarnClusterDescriptor.getUserJarInclusionMode(flinkConfiguration);
        this.configurationDirectory = (String)Preconditions.checkNotNull((Object)configurationDirectory);
    }

    public YarnClient getYarnClient() {
        return this.yarnClient;
    }

    protected abstract String getYarnSessionClusterEntrypoint();

    protected abstract String getYarnJobClusterEntrypoint();

    public Configuration getFlinkConfiguration() {
        return this.flinkConfiguration;
    }

    public void setQueue(String queue) {
        this.yarnQueue = queue;
    }

    public void setLocalJarPath(org.apache.hadoop.fs.Path localJarPath) {
        if (!localJarPath.toString().endsWith("jar")) {
            throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
        }
        this.flinkJarPath = localJarPath;
    }

    public void addShipFiles(List<File> shipFiles) {
        this.shipFiles.addAll(shipFiles);
    }

    public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
        this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
    }

    public String getDynamicPropertiesEncoded() {
        return this.dynamicPropertiesEncoded;
    }

    private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
        int numYarnMaxVcores;
        if (clusterSpecification.getNumberTaskManagers() <= 0) {
            throw new YarnDeploymentException("Taskmanager count must be positive");
        }
        if (this.flinkJarPath == null) {
            throw new YarnDeploymentException("The Flink jar path is null");
        }
        if (this.configurationDirectory == null) {
            throw new YarnDeploymentException("Configuration directory not set");
        }
        if (this.flinkConfiguration == null) {
            throw new YarnDeploymentException("Flink configuration object has not been set");
        }
        try {
            numYarnMaxVcores = this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING}).stream().mapToInt(report -> report.getCapability().getVirtualCores()).max().orElse(0);
        }
        catch (Exception e) {
            throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
        }
        int configuredVcores = this.flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
        if (configuredVcores > numYarnMaxVcores) {
            throw new IllegalConfigurationException(String.format("The number of requested virtual cores per node %d exceeds the maximum number of virtual cores %d available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with '%s.'", configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
        }
        if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
            LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
        }
    }

    private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
        for (int i = 0; i < nodeManagers.length; ++i) {
            if (nodeManagers[i] < toAllocate) continue;
            int n = i;
            nodeManagers[n] = nodeManagers[n] - toAllocate;
            return true;
        }
        return false;
    }

    @Deprecated
    public void setDetachedMode(boolean detachedMode) {
        this.detached = detachedMode;
    }

    @Deprecated
    public boolean isDetachedMode() {
        return this.detached;
    }

    public String getZookeeperNamespace() {
        return this.zookeeperNamespace;
    }

    public void setZookeeperNamespace(String zookeeperNamespace) {
        this.zookeeperNamespace = zookeeperNamespace;
    }

    public String getNodeLabel() {
        return this.nodeLabel;
    }

    public void setNodeLabel(String nodeLabel) {
        this.nodeLabel = nodeLabel;
    }

    public void close() {
        if (!this.sharedYarnClient) {
            this.yarnClient.stop();
        }
    }

    public ClusterClient<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
        try {
            ApplicationReport appReport;
            if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
                LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
            }
            if ((appReport = this.yarnClient.getApplicationReport(applicationId)).getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}", (Object)applicationId, (Object)appReport.getFinalApplicationStatus());
                throw new RuntimeException("The Yarn application " + applicationId + " doesn't run anymore.");
            }
            String host = appReport.getHost();
            int rpcPort = appReport.getRpcPort();
            LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'", new Object[]{host, rpcPort, applicationId});
            this.flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
            this.flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort);
            this.flinkConfiguration.setString(RestOptions.ADDRESS, host);
            this.flinkConfiguration.setInteger(RestOptions.PORT, rpcPort);
            return this.createYarnClusterClient(this, -1, -1, appReport, this.flinkConfiguration, false);
        }
        catch (Exception e) {
            throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", (Throwable)e);
        }
    }

    public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        try {
            return this.deployInternal(clusterSpecification, "Flink session cluster", this.getYarnSessionClusterEntrypoint(), null, false);
        }
        catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", (Throwable)e);
        }
    }

    public void killCluster(ApplicationId applicationId) throws FlinkException {
        try {
            this.yarnClient.killApplication(applicationId);
            Utils.deleteApplicationFiles(Collections.singletonMap("_FLINK_YARN_FILES", this.getYarnFilesDir(applicationId).toUri().toString()));
        }
        catch (IOException | YarnException e) {
            throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
        }
    }

    private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
        try {
            long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
            long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB((Configuration)this.flinkConfiguration, (long)taskManagerMemorySize);
            TaskManagerServices.calculateHeapSizeMB((long)(taskManagerMemorySize - cutoff), (Configuration)this.flinkConfiguration);
        }
        catch (IllegalArgumentException iae) {
            throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster.", (Throwable)iae);
        }
    }

    protected ClusterClient<ApplicationId> deployInternal(ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception {
        ClusterSpecification validClusterSpecification;
        ClusterResourceDescription clusterResourceDescription;
        this.validateClusterSpecification(clusterSpecification);
        if (UserGroupInformation.isSecurityEnabled()) {
            boolean useTicketCache = this.flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
            UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
            if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && useTicketCache && !loginUser.hasKerberosCredentials()) {
                LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
                throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
            }
        }
        this.isReadyForDeployment(clusterSpecification);
        this.checkYarnQueues(this.yarnClient);
        Map<String, String> dynProperties = FlinkYarnSessionCli.getDynamicProperties(this.dynamicPropertiesEncoded);
        for (Map.Entry entry : dynProperties.entrySet()) {
            this.flinkConfiguration.setString((String)entry.getKey(), (String)entry.getValue());
        }
        if (this.yarnApplication == null) {
            this.yarnApplication = this.yarnClient.createApplication();
            this.appResponse = this.yarnApplication.getNewApplicationResponse();
        }
        Resource maxRes = this.appResponse.getMaximumResourceCapability();
        try {
            clusterResourceDescription = this.getCurrentFreeClusterResources(this.yarnClient);
        }
        catch (IOException | YarnException e) {
            this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
        }
        int yarnMinAllocationMB = this.yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", 0);
        try {
            validClusterSpecification = this.validateClusterResources(clusterSpecification, yarnMinAllocationMB, maxRes, clusterResourceDescription);
        }
        catch (YarnDeploymentException yde) {
            this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            throw yde;
        }
        LOG.info("Cluster specification: {}", (Object)validClusterSpecification);
        ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL;
        this.flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
        ApplicationReport report = this.startAppMaster(this.flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, this.yarnClient, this.yarnApplication, validClusterSpecification);
        String host = report.getHost();
        int port = report.getRpcPort();
        this.flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
        this.flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
        this.flinkConfiguration.setString(RestOptions.ADDRESS, host);
        this.flinkConfiguration.setInteger(RestOptions.PORT, port);
        return this.createYarnClusterClient(this, validClusterSpecification.getNumberTaskManagers(), validClusterSpecification.getSlotsPerTaskManager(), report, this.flinkConfiguration, true);
    }

    protected ClusterSpecification validateClusterResources(ClusterSpecification clusterSpecification, int yarnMinAllocationMB, Resource maximumResourceCapability, ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
        int[] nmFree;
        int taskManagerCount = clusterSpecification.getNumberTaskManagers();
        int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
        int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
        if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
            LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.");
        }
        if (jobManagerMemoryMb < yarnMinAllocationMB) {
            jobManagerMemoryMb = yarnMinAllocationMB;
        }
        if (taskManagerMemoryMb < yarnMinAllocationMB) {
            taskManagerMemoryMb = yarnMinAllocationMB;
        }
        String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
        if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
            throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
            throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.";
        int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
        if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
            LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
            LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
            LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (!AbstractYarnClusterDescriptor.allocateResource(nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length), jobManagerMemoryMb)) {
            LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        for (int i = 0; i < taskManagerCount; ++i) {
            if (AbstractYarnClusterDescriptor.allocateResource(nmFree, taskManagerMemoryMb)) continue;
            LOG.warn("There is not enough memory available in the YARN cluster. The TaskManager(s) require " + taskManagerMemoryMb + "MB each. NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\nAfter allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, the following NodeManagers are available: " + Arrays.toString(nmFree) + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        return new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(jobManagerMemoryMb).setTaskManagerMemoryMB(taskManagerMemoryMb).setNumberTaskManagers(clusterSpecification.getNumberTaskManagers()).setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager()).createClusterSpecification();
    }

    private void checkYarnQueues(YarnClient yarnClient) {
        block7: {
            try {
                List queues = yarnClient.getAllQueues();
                if (queues.size() > 0 && this.yarnQueue != null) {
                    boolean queueFound = false;
                    for (QueueInfo queue : queues) {
                        if (!queue.getQueueName().equals(this.yarnQueue)) continue;
                        queueFound = true;
                        break;
                    }
                    if (!queueFound) {
                        String queueNames = "";
                        for (QueueInfo queue : queues) {
                            queueNames = queueNames + queue.getQueueName() + ", ";
                        }
                        LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. Available queues: " + queueNames);
                    }
                } else {
                    LOG.debug("The YARN cluster does not have any queues configured");
                }
            }
            catch (Throwable e) {
                LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
                if (!LOG.isDebugEnabled()) break block7;
                LOG.debug("Error details", e);
            }
        }
    }

    public ApplicationReport startAppMaster(Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception {
        ApplicationReport report;
        String krb5Config;
        File log4jFile;
        boolean hasLog4j;
        try {
            org.apache.flink.core.fs.FileSystem.initialize((Configuration)configuration);
        }
        catch (IOException e) {
            throw new IOException("Error while setting the default filesystem scheme from configuration.", e);
        }
        FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        org.apache.hadoop.fs.Path homeDir = this.stagingDir != null ? this.stagingDir : fs.getHomeDirectory();
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && fs.getScheme().startsWith("file")) {
            LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values.The Flink YARN client needs to store its files in a distributed file system");
        }
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
        HashSet<File> systemShipFiles = new HashSet<File>(this.shipFiles.size());
        for (File file : this.shipFiles) {
            systemShipFiles.add(file.getAbsoluteFile());
        }
        File logbackFile = new File(this.configurationDirectory + File.separator + "logback.xml");
        boolean hasLogback = logbackFile.exists();
        if (hasLogback) {
            systemShipFiles.add(logbackFile);
        }
        if (hasLog4j = (log4jFile = new File(this.configurationDirectory + File.separator + "log4j.properties")).exists()) {
            systemShipFiles.add(log4jFile);
            if (hasLogback) {
                LOG.warn("The configuration directory ('" + this.configurationDirectory + "') contains both LOG4J and Logback configuration files. Please delete or rename one of them.");
            }
        }
        this.addLibFolderToShipFiles(systemShipFiles);
        ApplicationId appId = appContext.getApplicationId();
        String zkNamespace = this.getZookeeperNamespace();
        if (zkNamespace == null || zkNamespace.isEmpty()) {
            zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
            this.setZookeeperNamespace(zkNamespace);
        }
        configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
        if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)configuration)) {
            appContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 2));
            this.activateHighAvailabilitySupport(appContext);
        } else {
            appContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }
        if (jobGraph != null) {
            for (Path path : jobGraph.getUserJars()) {
                this.userJarFiles.add(new File(path.toUri()));
            }
        }
        HashMap<String, LocalResource> localResources = new HashMap<String, LocalResource>(2 + systemShipFiles.size() + this.userJarFiles.size());
        ArrayList<org.apache.hadoop.fs.Path> paths = new ArrayList<org.apache.hadoop.fs.Path>(2 + systemShipFiles.size() + this.userJarFiles.size());
        StringBuilder envShipFileList = new StringBuilder();
        List<String> systemClassPaths = AbstractYarnClusterDescriptor.uploadAndRegisterFiles(systemShipFiles, fs, homeDir, appId, paths, localResources, envShipFileList);
        List<String> userClassPaths = AbstractYarnClusterDescriptor.uploadAndRegisterFiles(this.userJarFiles, fs, homeDir, appId, paths, localResources, envShipFileList);
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);
        StringBuilder classPathBuilder = new StringBuilder();
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        org.apache.hadoop.fs.Path remotePathJar = AbstractYarnClusterDescriptor.setupSingleLocalResource("flink.jar", fs, appId, this.flinkJarPath, localResources, homeDir, "");
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, clusterSpecification.getSlotsPerTaskManager());
        configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, clusterSpecification.getTaskManagerMemoryMB() + "m");
        File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
        tmpConfigurationFile.deleteOnExit();
        BootstrapTools.writeConfiguration((Configuration)configuration, (File)tmpConfigurationFile);
        org.apache.hadoop.fs.Path remotePathConf = AbstractYarnClusterDescriptor.setupSingleLocalResource("flink-conf.yaml", fs, appId, new org.apache.hadoop.fs.Path(tmpConfigurationFile.getAbsolutePath()), localResources, homeDir, "");
        paths.add(remotePathJar);
        classPathBuilder.append("flink.jar").append(File.pathSeparator);
        paths.add(remotePathConf);
        classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
        if (jobGraph != null) {
            try {
                File fp = File.createTempFile(appId.toString(), null);
                fp.deleteOnExit();
                try (FileOutputStream output = new FileOutputStream(fp);
                     ObjectOutputStream obOutput = new ObjectOutputStream(output);){
                    obOutput.writeObject(jobGraph);
                }
                String jobGraphFilename = "job.graph";
                this.flinkConfiguration.setString(FileJobGraphRetriever.JOB_GRAPH_FILE_PATH, "job.graph");
                org.apache.hadoop.fs.Path pathFromYarnURL = AbstractYarnClusterDescriptor.setupSingleLocalResource("job.graph", fs, appId, new org.apache.hadoop.fs.Path(fp.toURI()), localResources, homeDir, "");
                paths.add(pathFromYarnURL);
                classPathBuilder.append("job.graph").append(File.pathSeparator);
            }
            catch (Exception e) {
                LOG.warn("Add job graph to local resource fail");
                throw e;
            }
        }
        org.apache.hadoop.fs.Path yarnFilesDir = this.getYarnFilesDir(appId);
        FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
        fs.setPermission(yarnFilesDir, permission);
        org.apache.hadoop.fs.Path remoteKrb5Path = null;
        org.apache.hadoop.fs.Path remoteYarnSiteXmlPath = null;
        boolean hasKrb5 = false;
        if (System.getenv("IN_TESTS") != null && (krb5Config = System.getProperty("java.security.krb5.conf")) != null && krb5Config.length() != 0) {
            File krb5 = new File(krb5Config);
            LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", (Object)krb5.getAbsolutePath());
            org.apache.hadoop.fs.Path krb5ConfPath = new org.apache.hadoop.fs.Path(krb5.getAbsolutePath());
            remoteKrb5Path = AbstractYarnClusterDescriptor.setupSingleLocalResource("krb5.conf", fs, appId, krb5ConfPath, localResources, homeDir, "");
            File f = new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml");
            LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", (Object)f.getAbsolutePath());
            org.apache.hadoop.fs.Path yarnSitePath = new org.apache.hadoop.fs.Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath = AbstractYarnClusterDescriptor.setupSingleLocalResource("yarn-site.xml", fs, appId, yarnSitePath, localResources, homeDir, "");
            hasKrb5 = true;
        }
        org.apache.hadoop.fs.Path remotePathKeytab = null;
        String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            LOG.info("Adding keytab {} to the AM container local resource bucket", (Object)keytab);
            remotePathKeytab = AbstractYarnClusterDescriptor.setupSingleLocalResource("krb5.keytab", fs, appId, new org.apache.hadoop.fs.Path(keytab), localResources, homeDir, "");
        }
        ContainerLaunchContext amContainer = this.setupApplicationMasterContainer(yarnClusterEntrypoint, hasLogback, hasLog4j, hasKrb5, clusterSpecification.getMasterMemoryMB());
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.info("Adding delegation token to the AM container..");
            Utils.setTokensFor(amContainer, paths, (org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        }
        amContainer.setLocalResources(localResources);
        fs.close();
        HashMap<String, String> appMasterEnv = new HashMap<String, String>();
        appMasterEnv.putAll(Utils.getEnvironmentVariables("containerized.master.env.", configuration));
        appMasterEnv.put("_FLINK_CLASSPATH", classPathBuilder.toString());
        appMasterEnv.put("_CLIENT_TM_COUNT", String.valueOf(clusterSpecification.getNumberTaskManagers()));
        appMasterEnv.put("_CLIENT_TM_MEMORY", String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
        appMasterEnv.put("_FLINK_JAR_PATH", remotePathJar.toString());
        appMasterEnv.put("_APP_ID", appId.toString());
        appMasterEnv.put("_CLIENT_HOME_DIR", homeDir.toString());
        appMasterEnv.put("_CLIENT_SHIP_FILES", envShipFileList.toString());
        appMasterEnv.put("_SLOTS", String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
        appMasterEnv.put("_DETACHED", String.valueOf(this.detached));
        appMasterEnv.put("_ZOOKEEPER_NAMESPACE", this.getZookeeperNamespace());
        appMasterEnv.put("_FLINK_YARN_FILES", yarnFilesDir.toUri().toString());
        appMasterEnv.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
        if (remotePathKeytab != null) {
            appMasterEnv.put("_KEYTAB_PATH", remotePathKeytab.toString());
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put("_KEYTAB_PRINCIPAL", principal);
        }
        if (remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
            appMasterEnv.put("_YARN_SITE_XML_PATH", remoteYarnSiteXmlPath.toString());
            appMasterEnv.put("_KRB5_PATH", remoteKrb5Path.toString());
        }
        if (this.dynamicPropertiesEncoded != null) {
            appMasterEnv.put("_DYNAMIC_PROPERTIES", this.dynamicPropertiesEncoded);
        }
        Utils.setupYarnClassPath((org.apache.hadoop.conf.Configuration)this.yarnConfiguration, appMasterEnv);
        amContainer.setEnvironment(appMasterEnv);
        Resource capability = (Resource)Records.newRecord(Resource.class);
        capability.setMemory(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(1);
        String customApplicationName = this.customName != null ? this.customName : applicationName;
        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType("Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);
        if (this.yarnQueue != null) {
            appContext.setQueue(this.yarnQueue);
        }
        this.setApplicationNodeLabel(appContext);
        this.setApplicationTags(appContext);
        DeploymentFailureHook deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);
        LOG.info("Waiting for the cluster to be allocated");
        long startTime = System.currentTimeMillis();
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        block33: while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            }
            catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", (Object)appState);
            switch (appState) {
                case FAILED: 
                case FINISHED: 
                case KILLED: {
                    throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + appState + " during deployment. \nDiagnostics from YARN: " + report.getDiagnostics() + "\nIf log aggregation is enabled on your cluster, use this command to further investigate the issue:\nyarn logs -applicationId " + appId);
                }
                case RUNNING: {
                    LOG.info("YARN application has been deployed successfully.");
                    break block33;
                }
                default: {
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000L) {
                        LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
                    lastAppState = appState;
                    Thread.sleep(250L);
                    continue block33;
                }
            }
            break;
        }
        if (this.isDetachedMode()) {
            LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + appId + "\nPlease also note that the temporary files of the YARN session in the home directory will not be removed.");
        }
        ShutdownHookUtil.removeShutdownHook((Thread)deploymentFailureHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        return report;
    }

    private org.apache.hadoop.fs.Path getYarnFilesDir(ApplicationId appId) throws IOException {
        FileSystem fileSystem = FileSystem.get((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        org.apache.hadoop.fs.Path homeDir = this.stagingDir != null ? this.stagingDir : fileSystem.getHomeDirectory();
        return new org.apache.hadoop.fs.Path(homeDir, ".flink/" + appId + '/');
    }

    private static org.apache.hadoop.fs.Path setupSingleLocalResource(String key, FileSystem fs, ApplicationId appId, org.apache.hadoop.fs.Path localSrcPath, Map<String, LocalResource> localResources, org.apache.hadoop.fs.Path targetHomeDir, String relativeTargetPath) throws IOException, URISyntaxException {
        Tuple2<org.apache.hadoop.fs.Path, LocalResource> resource = Utils.setupLocalResource(fs, appId.toString(), localSrcPath, targetHomeDir, relativeTargetPath);
        localResources.put(key, (LocalResource)resource.f1);
        return (org.apache.hadoop.fs.Path)resource.f0;
    }

    static List<String> uploadAndRegisterFiles(Collection<File> shipFiles, final FileSystem fs, final org.apache.hadoop.fs.Path targetHomeDir, final ApplicationId appId, final List<org.apache.hadoop.fs.Path> remotePaths, final Map<String, LocalResource> localResources, final StringBuilder envShipFileList) throws IOException, URISyntaxException {
        final ArrayList<String> classPaths = new ArrayList<String>(2 + shipFiles.size());
        for (File shipFile : shipFiles) {
            if (shipFile.isDirectory()) {
                java.nio.file.Path shipPath = shipFile.toPath();
                final java.nio.file.Path parentPath = shipPath.getParent();
                Files.walkFileTree(shipPath, (FileVisitor<? super java.nio.file.Path>)new SimpleFileVisitor<java.nio.file.Path>(){

                    @Override
                    public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
                        String fileName = file.getFileName().toString();
                        if (!fileName.startsWith("flink-dist") || !fileName.endsWith("jar")) {
                            java.nio.file.Path relativePath = parentPath.relativize(file);
                            String key = relativePath.toString();
                            try {
                                org.apache.hadoop.fs.Path remotePath = AbstractYarnClusterDescriptor.setupSingleLocalResource(key, fs, appId, new org.apache.hadoop.fs.Path(file.toUri()), localResources, targetHomeDir, relativePath.getParent().toString());
                                remotePaths.add(remotePath);
                                envShipFileList.append(key).append("=").append(remotePath).append(",");
                                classPaths.add(key);
                            }
                            catch (URISyntaxException e) {
                                throw new IOException(e);
                            }
                        }
                        return FileVisitResult.CONTINUE;
                    }
                });
                continue;
            }
            if (shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar")) continue;
            org.apache.hadoop.fs.Path shipLocalPath = new org.apache.hadoop.fs.Path(shipFile.toURI());
            String key = shipFile.getName();
            org.apache.hadoop.fs.Path remotePath = AbstractYarnClusterDescriptor.setupSingleLocalResource(key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
            remotePaths.add(remotePath);
            envShipFileList.append(key).append("=").append(remotePath).append(",");
            classPaths.add(key);
        }
        return classPaths;
    }

    private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
        LOG.info("Killing YARN application");
        try {
            yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
        }
        catch (Exception e) {
            LOG.debug("Error while killing YARN application", (Throwable)e);
        }
        yarnClient.stop();
    }

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
        List nodes = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
        int totalFreeMemory = 0;
        int containerLimit = 0;
        int[] nodeManagersFree = new int[nodes.size()];
        for (int i = 0; i < nodes.size(); ++i) {
            int free;
            NodeReport rep = (NodeReport)nodes.get(i);
            nodeManagersFree[i] = free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
            totalFreeMemory += free;
            if (free <= containerLimit) continue;
            containerLimit = free;
        }
        return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
    }

    public String getClusterDescription() {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            YarnClusterMetrics metrics = this.yarnClient.getYarnClusterMetrics();
            ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
            List nodes = this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
            String format = "|%-16s |%-16s %n";
            ps.printf("|Property         |Value          %n", new Object[0]);
            ps.println("+---------------------------------------+");
            int totalMemory = 0;
            int totalCores = 0;
            for (NodeReport rep : nodes) {
                Resource res = rep.getCapability();
                totalMemory += res.getMemory();
                totalCores += res.getVirtualCores();
                ps.format("|%-16s |%-16s %n", "NodeID", rep.getNodeId());
                ps.format("|%-16s |%-16s %n", "Memory", res.getMemory() + " MB");
                ps.format("|%-16s |%-16s %n", "vCores", res.getVirtualCores());
                ps.format("|%-16s |%-16s %n", "HealthReport", rep.getHealthReport());
                ps.format("|%-16s |%-16s %n", "Containers", rep.getNumContainers());
                ps.println("+---------------------------------------+");
            }
            ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
            List qInfo = this.yarnClient.getAllQueues();
            for (QueueInfo q : qInfo) {
                ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
            }
            return baos.toString();
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't get cluster description", e);
        }
    }

    public void setName(String name) {
        if (name == null) {
            throw new IllegalArgumentException("The passed name is null");
        }
        this.customName = name;
    }

    private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
        reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout((Configuration)this.flinkConfiguration).toMillis());
    }

    private void setApplicationTags(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        String tagsString = this.flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
        HashSet<String> applicationTags = new HashSet<String>();
        for (String tag : tagsString.split(",")) {
            String trimmedTag = tag.trim();
            if (trimmedTag.isEmpty()) continue;
            applicationTags.add(trimmedTag);
        }
        reflector.setApplicationTags(appContext, applicationTags);
    }

    private void setApplicationNodeLabel(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        if (this.nodeLabel != null) {
            ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
            reflector.setApplicationNodeLabel(appContext, this.nodeLabel);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected void addLibFolderToShipFiles(Collection<File> effectiveShipFiles) {
        String libDir = System.getenv().get("FLINK_LIB_DIR");
        if (libDir != null) {
            File libDirFile = new File(libDir);
            if (!libDirFile.isDirectory()) throw new YarnDeploymentException("The environment variable 'FLINK_LIB_DIR' is set to '" + libDir + "' but the directory doesn't exist.");
            effectiveShipFiles.add(libDirFile);
            return;
        } else {
            if (!this.shipFiles.isEmpty()) return;
            LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. Not shipping any library files.", (Object)"FLINK_LIB_DIR");
        }
    }

    protected ContainerLaunchContext setupApplicationMasterContainer(String yarnClusterEntrypoint, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, int jobManagerMemoryMb) {
        String javaOpts = this.flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
        if (this.flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
            javaOpts = javaOpts + " " + this.flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
        }
        if (hasKrb5) {
            javaOpts = javaOpts + " -Djava.security.krb5.conf=krb5.conf";
        }
        ContainerLaunchContext amContainer = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        HashMap<String, String> startCommandValues = new HashMap<String, String>();
        startCommandValues.put("java", "$JAVA_HOME/bin/java");
        int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, this.flinkConfiguration);
        String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize);
        startCommandValues.put("jvmmem", jvmHeapMem);
        startCommandValues.put("jvmopts", javaOpts);
        String logging = "";
        if (hasLogback || hasLog4j) {
            logging = "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"";
            if (hasLogback) {
                logging = logging + " -Dlogback.configurationFile=file:logback.xml";
            }
            if (hasLog4j) {
                logging = logging + " -Dlog4j.configuration=file:log4j.properties";
            }
        }
        startCommandValues.put("logging", logging);
        startCommandValues.put("class", yarnClusterEntrypoint);
        startCommandValues.put("redirects", "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err");
        startCommandValues.put("args", "");
        String commandTemplate = this.flinkConfiguration.getString("yarn.container-start-command-template", "%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%");
        String amCommand = BootstrapTools.getStartCommand((String)commandTemplate, startCommandValues);
        amContainer.setCommands(Collections.singletonList(amCommand));
        LOG.debug("Application Master start command: " + amCommand);
        return amContainer;
    }

    private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(Configuration config) {
        AbstractYarnClusterDescriptor.throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(config);
        return (YarnConfigOptions.UserJarInclusion)config.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
    }

    private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(Configuration config) {
        String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
        if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
            throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)", YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
        }
    }

    protected abstract ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor var1, int var2, int var3, ApplicationReport var4, Configuration var5, boolean var6) throws Exception;

    private class DeploymentFailureHook
    extends Thread {
        private final YarnClient yarnClient;
        private final YarnClientApplication yarnApplication;
        private final org.apache.hadoop.fs.Path yarnFilesDir;

        DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication, org.apache.hadoop.fs.Path yarnFilesDir) {
            this.yarnClient = (YarnClient)Preconditions.checkNotNull((Object)yarnClient);
            this.yarnApplication = (YarnClientApplication)Preconditions.checkNotNull((Object)yarnApplication);
            this.yarnFilesDir = (org.apache.hadoop.fs.Path)Preconditions.checkNotNull((Object)yarnFilesDir);
        }

        @Override
        public void run() {
            LOG.info("Cancelling deployment from Deployment Failure Hook");
            AbstractYarnClusterDescriptor.this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            LOG.info("Deleting files in {}.", (Object)this.yarnFilesDir);
            try {
                FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)AbstractYarnClusterDescriptor.this.yarnConfiguration);
                if (!fs.delete(this.yarnFilesDir, true)) {
                    throw new IOException("Deleting files in " + this.yarnFilesDir + " was unsuccessful");
                }
                fs.close();
            }
            catch (IOException e) {
                LOG.error("Failed to delete Flink Jar and configuration files in HDFS", (Throwable)e);
            }
        }
    }

    private static class YarnDeploymentException
    extends RuntimeException {
        private static final long serialVersionUID = -812040641215388943L;

        public YarnDeploymentException(String message) {
            super(message);
        }

        public YarnDeploymentException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static class ApplicationSubmissionContextReflector {
        private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
        private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
        private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
        private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval";
        private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts";
        private static final String NODE_LABEL_EXPRESSION_NAME = "setNodeLabelExpression";
        private final Method applicationTagsMethod;
        private final Method attemptFailuresValidityIntervalMethod;
        private final Method keepContainersMethod;
        @Nullable
        private final Method nodeLabelExpressionMethod;

        public static ApplicationSubmissionContextReflector getInstance() {
            return instance;
        }

        private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
            Method nodeLabelExpressionMethod;
            Method keepContainersMethod;
            Method attemptFailuresValidityIntervalMethod;
            Method applicationTagsMethod;
            try {
                applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
                applicationTagsMethod = null;
            }
            this.applicationTagsMethod = applicationTagsMethod;
            try {
                attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, Long.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
                attemptFailuresValidityIntervalMethod = null;
            }
            this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
            try {
                keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, Boolean.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
                keepContainersMethod = null;
            }
            this.keepContainersMethod = keepContainersMethod;
            try {
                nodeLabelExpressionMethod = clazz.getMethod(NODE_LABEL_EXPRESSION_NAME, String.class);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
                nodeLabelExpressionMethod = null;
            }
            this.nodeLabelExpressionMethod = nodeLabelExpressionMethod;
        }

        public void setApplicationTags(ApplicationSubmissionContext appContext, Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException {
            if (this.applicationTagsMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.applicationTagsMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.applicationTagsMethod.invoke((Object)appContext, applicationTags);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
            }
        }

        public void setApplicationNodeLabel(ApplicationSubmissionContext appContext, String nodeLabel) throws InvocationTargetException, IllegalAccessException {
            if (this.nodeLabelExpressionMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.nodeLabelExpressionMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.nodeLabelExpressionMethod.invoke((Object)appContext, nodeLabel);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
            }
        }

        public void setAttemptFailuresValidityInterval(ApplicationSubmissionContext appContext, long validityInterval) throws InvocationTargetException, IllegalAccessException {
            if (this.attemptFailuresValidityIntervalMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.attemptFailuresValidityIntervalMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.attemptFailuresValidityIntervalMethod.invoke((Object)appContext, validityInterval);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
            }
        }

        public void setKeepContainersAcrossApplicationAttempts(ApplicationSubmissionContext appContext, boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
            if (this.keepContainersMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.keepContainersMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.keepContainersMethod.invoke((Object)appContext, keepContainers);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
            }
        }
    }

    private static class ClusterResourceDescription {
        public final int totalFreeMemory;
        public final int containerLimit;
        public final int[] nodeManagersFree;

        public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
            this.totalFreeMemory = totalFreeMemory;
            this.containerLimit = containerLimit;
            this.nodeManagersFree = nodeManagersFree;
        }
    }
}

