/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginConfig;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnClusterDescriptor
implements ClusterDescriptor<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
    private final YarnConfiguration yarnConfiguration;
    private final YarnClient yarnClient;
    private final YarnClusterInformationRetriever yarnClusterInformationRetriever;
    private final boolean sharedYarnClient;
    private final List<File> shipFiles = new LinkedList<File>();
    private final String yarnQueue;
    private org.apache.hadoop.fs.Path flinkJarPath;
    private final Configuration flinkConfiguration;
    private final String customName;
    private final String nodeLabel;
    private final String applicationType;
    private String zookeeperNamespace;
    private YarnConfigOptions.UserJarInclusion userJarInclusion;
    private final Map<String, String> hopsLocalResources = new HashMap<String, String>();
    private org.apache.hadoop.fs.Path stagingDir;
    private YarnClientApplication yarnApplication;
    private GetNewApplicationResponse appResponse;
    private boolean docker = false;
    private String dockerImage;
    private String dockerMounts;

    public void setStagingDir(org.apache.hadoop.fs.Path stagingDir) {
        this.stagingDir = stagingDir;
    }

    public void setYarnApplication(YarnClientApplication yarnApplication) {
        this.yarnApplication = yarnApplication;
    }

    public void setAppResponse(GetNewApplicationResponse appResponse) {
        this.appResponse = appResponse;
    }

    public void addHopsLocalResources(String key, String path) {
        this.hopsLocalResources.put(key, path);
    }

    public void setDocker(String dockerImage, String dockerMounts) {
        this.docker = true;
        this.dockerImage = dockerImage;
        this.dockerMounts = dockerMounts;
    }

    public YarnClusterDescriptor(Configuration flinkConfiguration, YarnConfiguration yarnConfiguration, YarnClient yarnClient, YarnClusterInformationRetriever yarnClusterInformationRetriever, boolean sharedYarnClient) {
        this.yarnConfiguration = (YarnConfiguration)Preconditions.checkNotNull((Object)yarnConfiguration);
        this.yarnClient = (YarnClient)Preconditions.checkNotNull((Object)yarnClient);
        this.yarnClusterInformationRetriever = (YarnClusterInformationRetriever)Preconditions.checkNotNull((Object)yarnClusterInformationRetriever);
        this.sharedYarnClient = sharedYarnClient;
        this.flinkConfiguration = (Configuration)Preconditions.checkNotNull((Object)flinkConfiguration);
        this.userJarInclusion = YarnClusterDescriptor.getUserJarInclusionMode(flinkConfiguration);
        this.getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
        this.decodeDirsToShipToCluster(flinkConfiguration).ifPresent(this::addShipFiles);
        this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
        this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
        this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
        this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
        this.zookeeperNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
    }

    private Optional<List<File>> decodeDirsToShipToCluster(Configuration configuration) {
        Preconditions.checkNotNull((Object)configuration);
        List files = ConfigUtils.decodeListFromConfig((ReadableConfig)configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
        return files.isEmpty() ? Optional.empty() : Optional.of(files);
    }

    private Optional<org.apache.hadoop.fs.Path> getLocalFlinkDistPath(Configuration configuration) {
        String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
        if (localJarPath != null) {
            return Optional.of(new org.apache.hadoop.fs.Path(localJarPath));
        }
        LOG.info("No path for the flink jar passed. Using the location of " + this.getClass() + " to locate the jar");
        String decodedPath = this.getDecodedJarPath();
        return decodedPath.endsWith(".jar") ? Optional.of(new org.apache.hadoop.fs.Path(new File(decodedPath).toURI())) : Optional.empty();
    }

    private String getDecodedJarPath() {
        String encodedJarPath = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
        try {
            return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath + " You can supply a path manually via the command line.");
        }
    }

    @VisibleForTesting
    List<File> getShipFiles() {
        return this.shipFiles;
    }

    public YarnClient getYarnClient() {
        return this.yarnClient;
    }

    protected String getYarnSessionClusterEntrypoint() {
        return YarnSessionClusterEntrypoint.class.getName();
    }

    protected String getYarnJobClusterEntrypoint() {
        return YarnJobClusterEntrypoint.class.getName();
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfiguration;
    }

    public void setLocalJarPath(org.apache.hadoop.fs.Path localJarPath) {
        if (!localJarPath.toString().endsWith("jar")) {
            throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
        }
        this.flinkJarPath = localJarPath;
    }

    public void addShipFiles(List<File> shipFiles) {
        Preconditions.checkArgument((this.userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED || YarnClusterDescriptor.isUsrLibDirIncludedInShipFiles(shipFiles) ? 1 : 0) != 0, (String)"This is an illegal ship directory : %s. When setting the %s to %s the name of ship directory can not be %s.", (Object[])new Object[]{"usrlib", YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), YarnConfigOptions.UserJarInclusion.DISABLED, "usrlib"});
        this.shipFiles.addAll(shipFiles);
    }

    private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {
        if (this.flinkJarPath == null) {
            throw new YarnDeploymentException("The Flink jar path is null");
        }
        if (this.flinkConfiguration == null) {
            throw new YarnDeploymentException("Flink configuration object has not been set");
        }
        int numYarnMaxVcores = this.yarnClusterInformationRetriever.getMaxVcores();
        int configuredAmVcores = this.flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
        if (configuredAmVcores > numYarnMaxVcores) {
            throw new IllegalConfigurationException(String.format("The number of requested virtual cores for application master %d exceeds the maximum number of virtual cores %d available in the Yarn Cluster.", configuredAmVcores, numYarnMaxVcores));
        }
        int configuredVcores = this.flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
        if (configuredVcores > numYarnMaxVcores) {
            throw new IllegalConfigurationException(String.format("The number of requested virtual cores per node %d exceeds the maximum number of virtual cores %d available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with '%s.'", configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
        }
        if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
            LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
        }
    }

    public String getZookeeperNamespace() {
        return this.zookeeperNamespace;
    }

    private void setZookeeperNamespace(String zookeeperNamespace) {
        this.zookeeperNamespace = zookeeperNamespace;
    }

    public String getNodeLabel() {
        return this.nodeLabel;
    }

    public void close() {
        if (!this.sharedYarnClient) {
            this.yarnClient.stop();
        }
    }

    public ClusterClientProvider<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
        try {
            ApplicationReport report;
            if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
                LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
            }
            if ((report = this.yarnClient.getApplicationReport(applicationId)).getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}", (Object)applicationId, (Object)report.getFinalApplicationStatus());
                throw new RuntimeException("The Yarn application " + applicationId + " doesn't run anymore.");
            }
            this.setClusterEntrypointInfoToConfig(report);
            return () -> {
                try {
                    return new RestClusterClient(this.flinkConfiguration, (Object)report.getApplicationId());
                }
                catch (Exception e) {
                    throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
                }
            };
        }
        catch (Exception e) {
            throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", (Throwable)e);
        }
    }

    public ClusterClientProvider<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        try {
            return this.deployInternal(clusterSpecification, "Flink session cluster", this.getYarnSessionClusterEntrypoint(), null, false);
        }
        catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", (Throwable)e);
        }
    }

    public ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException {
        try {
            return this.deployInternal(clusterSpecification, "Flink per-job cluster", this.getYarnJobClusterEntrypoint(), jobGraph, detached);
        }
        catch (Exception e) {
            throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", (Throwable)e);
        }
    }

    public void killCluster(ApplicationId applicationId) throws FlinkException {
        try {
            this.yarnClient.killApplication(applicationId);
            Utils.deleteApplicationFiles(Collections.singletonMap("_FLINK_YARN_FILES", this.getYarnFilesDir(applicationId).toUri().toString()));
        }
        catch (IOException | YarnException e) {
            throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
        }
    }

    private ClusterClientProvider<ApplicationId> deployInternal(ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception {
        ClusterSpecification validClusterSpecification;
        ClusterResourceDescription freeClusterMem;
        boolean useTicketCache;
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        if (HadoopUtils.isKerberosSecurityEnabled((UserGroupInformation)currentUser) && !HadoopUtils.areKerberosCredentialsValid((UserGroupInformation)currentUser, (boolean)(useTicketCache = this.flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)))) {
            throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials or delegation tokens!");
        }
        this.isReadyForDeployment(clusterSpecification);
        this.checkYarnQueues(this.yarnClient);
        if (this.yarnApplication == null) {
            this.yarnApplication = this.yarnClient.createApplication();
            this.appResponse = this.yarnApplication.getNewApplicationResponse();
        }
        Resource maxRes = this.appResponse.getMaximumResourceCapability();
        try {
            freeClusterMem = this.getCurrentFreeClusterResources(this.yarnClient);
        }
        catch (IOException | YarnException e) {
            this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
        }
        int yarnMinAllocationMB = this.yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", 0);
        try {
            validClusterSpecification = this.validateClusterResources(clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        }
        catch (YarnDeploymentException yde) {
            this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            throw yde;
        }
        LOG.info("Cluster specification: {}", (Object)validClusterSpecification);
        ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL;
        this.flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
        ApplicationReport report = this.startAppMaster(this.flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, this.yarnClient, this.yarnApplication, validClusterSpecification);
        if (detached) {
            ApplicationId yarnApplicationId = report.getApplicationId();
            YarnClusterDescriptor.logDetachedClusterInformation(yarnApplicationId, LOG);
        }
        this.setClusterEntrypointInfoToConfig(report);
        return () -> {
            try {
                return new RestClusterClient(this.flinkConfiguration, (Object)report.getApplicationId());
            }
            catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }

    private ClusterSpecification validateClusterResources(ClusterSpecification clusterSpecification, int yarnMinAllocationMB, Resource maximumResourceCapability, ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
        int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
        int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
        if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
            LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.");
        }
        if (jobManagerMemoryMb < yarnMinAllocationMB) {
            jobManagerMemoryMb = yarnMinAllocationMB;
        }
        String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
        if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
            throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
            throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.";
        if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
            LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
            LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        return new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(jobManagerMemoryMb).setTaskManagerMemoryMB(taskManagerMemoryMb).setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager()).createClusterSpecification();
    }

    private void checkYarnQueues(YarnClient yarnClient) {
        block7: {
            try {
                List queues = yarnClient.getAllQueues();
                if (queues.size() > 0 && this.yarnQueue != null) {
                    boolean queueFound = false;
                    for (QueueInfo queue : queues) {
                        if (!queue.getQueueName().equals(this.yarnQueue)) continue;
                        queueFound = true;
                        break;
                    }
                    if (!queueFound) {
                        String queueNames = "";
                        for (QueueInfo queue : queues) {
                            queueNames = queueNames + queue.getQueueName() + ", ";
                        }
                        LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. Available queues: " + queueNames);
                    }
                } else {
                    LOG.debug("The YARN cluster does not have any queues configured");
                }
            }
            catch (Throwable e) {
                LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
                if (!LOG.isDebugEnabled()) break block7;
                LOG.debug("Error details", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ApplicationReport startAppMaster(Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception {
        ApplicationReport report;
        Set<File> userJarFiles;
        org.apache.flink.core.fs.FileSystem.initialize((Configuration)configuration, (PluginManager)PluginUtils.createPluginManagerFromRootFolder((Configuration)configuration));
        FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        org.apache.hadoop.fs.Path homeDir = this.stagingDir != null ? this.stagingDir : fs.getHomeDirectory();
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && fs.getScheme().startsWith("file")) {
            LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values.The Flink YARN client needs to store its files in a distributed file system");
        }
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
        HashSet<File> systemShipFiles = new HashSet<File>(this.shipFiles.size());
        HashSet<File> shipOnlyFiles = new HashSet<File>();
        for (File file : this.shipFiles) {
            systemShipFiles.add(file.getAbsoluteFile());
        }
        String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(new File(logConfigFilePath));
        }
        this.addLibFoldersToShipFiles(systemShipFiles);
        this.addPluginsFoldersToShipFiles(shipOnlyFiles);
        ApplicationId appId = appContext.getApplicationId();
        String zkNamespace = this.getZookeeperNamespace();
        if (zkNamespace == null || zkNamespace.isEmpty()) {
            zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
            this.setZookeeperNamespace(zkNamespace);
        }
        configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
        if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)configuration)) {
            appContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 2));
            this.activateHighAvailabilitySupport(appContext);
        } else {
            appContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }
        Set<File> set = userJarFiles = jobGraph == null ? Collections.emptySet() : jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
        if (jobGraph != null) {
            for (Map.Entry entry : jobGraph.getUserArtifacts().entrySet()) {
                Path path = new Path(((DistributedCache.DistributedCacheEntry)entry.getValue()).filePath);
                if (path.getFileSystem().isDistributedFS()) continue;
                org.apache.hadoop.fs.Path localPath = new org.apache.hadoop.fs.Path(path.getPath());
                Tuple2<org.apache.hadoop.fs.Path, Long> remoteFileInfo = Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, (String)entry.getKey());
                jobGraph.setUserArtifactRemotePath((String)entry.getKey(), ((org.apache.hadoop.fs.Path)remoteFileInfo.f0).toString());
            }
            jobGraph.writeUserArtifactEntriesToConfiguration();
        }
        HashMap<String, LocalResource> localResources = new HashMap<String, LocalResource>(2 + systemShipFiles.size() + userJarFiles.size());
        localResources.putAll(Utils.calculateHopsLocalResources(this.hopsLocalResources, this.yarnConfiguration));
        ArrayList<org.apache.hadoop.fs.Path> paths = new ArrayList<org.apache.hadoop.fs.Path>(2 + systemShipFiles.size() + userJarFiles.size());
        StringBuilder envShipFileList = new StringBuilder();
        for (String key : this.hopsLocalResources.keySet()) {
            envShipFileList.append(key).append("=").append(this.hopsLocalResources.get(key)).append(",");
        }
        List<String> systemClassPaths = YarnClusterDescriptor.uploadAndRegisterFiles(systemShipFiles, fs, homeDir, appId, paths, localResources, ".", envShipFileList);
        YarnClusterDescriptor.uploadAndRegisterFiles(shipOnlyFiles, fs, homeDir, appId, paths, localResources, ".", envShipFileList);
        List<String> userClassPaths = YarnClusterDescriptor.uploadAndRegisterFiles(userJarFiles, fs, homeDir, appId, paths, localResources, this.userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? "usrlib" : ".", envShipFileList);
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);
        StringBuilder classPathBuilder = new StringBuilder();
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }
        org.apache.hadoop.fs.Path remotePathJar = YarnClusterDescriptor.setupSingleLocalResource(this.flinkJarPath.getName(), fs, appId, this.flinkJarPath, localResources, homeDir, "");
        paths.add(remotePathJar);
        classPathBuilder.append(this.flinkJarPath.getName()).append(File.pathSeparator);
        File tmpConfigurationFile = null;
        try {
            tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
            BootstrapTools.writeConfiguration((Configuration)configuration, (File)tmpConfigurationFile);
            String flinkConfigKey = "flink-conf.yaml";
            org.apache.hadoop.fs.Path remotePathConf = YarnClusterDescriptor.setupSingleLocalResource(flinkConfigKey, fs, appId, new org.apache.hadoop.fs.Path(tmpConfigurationFile.getAbsolutePath()), localResources, homeDir, "");
            envShipFileList.append(flinkConfigKey).append("=").append(remotePathConf).append(",");
            paths.add(remotePathConf);
            classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
            for (String key : this.hopsLocalResources.keySet()) {
                classPathBuilder.append(key).append(File.pathSeparator);
            }
        }
        finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", (Object)tmpConfigurationFile.toPath());
            }
        }
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        if (jobGraph != null) {
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                Object object = null;
                try (ObjectOutputStream obOutput2 = new ObjectOutputStream(output);){
                    obOutput2.writeObject(jobGraph);
                }
                catch (Throwable obOutput2) {
                    object = obOutput2;
                    throw obOutput2;
                }
                finally {
                    if (output != null) {
                        if (object != null) {
                            try {
                                output.close();
                            }
                            catch (Throwable obOutput2) {
                                ((Throwable)object).addSuppressed(obOutput2);
                            }
                        } else {
                            output.close();
                        }
                    }
                }
                String jobGraphFilename = "job.graph";
                this.flinkConfiguration.setString(FileJobGraphRetriever.JOB_GRAPH_FILE_PATH, "job.graph");
                org.apache.hadoop.fs.Path pathFromYarnURL = YarnClusterDescriptor.setupSingleLocalResource("job.graph", fs, appId, new org.apache.hadoop.fs.Path(tmpJobGraphFile.toURI()), localResources, homeDir, "");
                paths.add(pathFromYarnURL);
                classPathBuilder.append("job.graph").append(File.pathSeparator);
            }
            catch (Exception e) {
                LOG.warn("Add job graph to local resource fail");
                throw e;
            }
            finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", (Object)tmpConfigurationFile.toPath());
                }
            }
        }
        org.apache.hadoop.fs.Path yarnFilesDir = this.getYarnFilesDir(appId);
        FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
        fs.setPermission(yarnFilesDir, permission);
        org.apache.hadoop.fs.Path remoteKrb5Path = null;
        org.apache.hadoop.fs.Path remoteYarnSiteXmlPath = null;
        boolean hasKrb5 = false;
        if (System.getenv("IN_TESTS") != null) {
            File f2 = new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml");
            LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", (Object)f2.getAbsolutePath());
            org.apache.hadoop.fs.Path yarnSitePath = new org.apache.hadoop.fs.Path(f2.getAbsolutePath());
            remoteYarnSiteXmlPath = YarnClusterDescriptor.setupSingleLocalResource("yarn-site.xml", fs, appId, yarnSitePath, localResources, homeDir, "");
            String krb5Config = System.getProperty("java.security.krb5.conf");
            if (krb5Config != null && krb5Config.length() != 0) {
                File krb5 = new File(krb5Config);
                LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", (Object)krb5.getAbsolutePath());
                org.apache.hadoop.fs.Path krb5ConfPath = new org.apache.hadoop.fs.Path(krb5.getAbsolutePath());
                remoteKrb5Path = YarnClusterDescriptor.setupSingleLocalResource("krb5.conf", fs, appId, krb5ConfPath, localResources, homeDir, "");
                hasKrb5 = true;
            }
        }
        org.apache.hadoop.fs.Path remotePathKeytab = null;
        String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            LOG.info("Adding keytab {} to the AM container local resource bucket", (Object)keytab);
            remotePathKeytab = YarnClusterDescriptor.setupSingleLocalResource("krb5.keytab", fs, appId, new org.apache.hadoop.fs.Path(keytab), localResources, homeDir, "");
        }
        boolean hasLogback = logConfigFilePath != null && logConfigFilePath.endsWith("logback.xml");
        boolean hasLog4j = logConfigFilePath != null && logConfigFilePath.endsWith("log4j.properties");
        ContainerLaunchContext amContainer = this.setupApplicationMasterContainer(yarnClusterEntrypoint, hasLogback, hasLog4j, hasKrb5, clusterSpecification.getMasterMemoryMB());
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.info("Adding delegation token to the AM container.");
            Utils.setTokensFor(amContainer, paths, (org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        }
        amContainer.setLocalResources(localResources);
        fs.close();
        HashMap<String, String> appMasterEnv = new HashMap<String, String>();
        appMasterEnv.putAll(BootstrapTools.getEnvironmentVariables((String)"containerized.master.env.", (Configuration)configuration));
        appMasterEnv.put("_FLINK_CLASSPATH", classPathBuilder.toString());
        appMasterEnv.put("_FLINK_JAR_PATH", remotePathJar.toString());
        appMasterEnv.put("_APP_ID", appId.toString());
        appMasterEnv.put("_CLIENT_HOME_DIR", homeDir.toString());
        appMasterEnv.put("_CLIENT_SHIP_FILES", envShipFileList.toString());
        appMasterEnv.put("_ZOOKEEPER_NAMESPACE", this.getZookeeperNamespace());
        appMasterEnv.put("_FLINK_YARN_FILES", yarnFilesDir.toUri().toString());
        appMasterEnv.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
        if (remotePathKeytab != null) {
            appMasterEnv.put("_KEYTAB_PATH", remotePathKeytab.toString());
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put("_KEYTAB_PRINCIPAL", principal);
        }
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put("_YARN_SITE_XML_PATH", remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put("_KRB5_PATH", remoteKrb5Path.toString());
        }
        if (this.docker) {
            appMasterEnv.put("YARN_CONTAINER_RUNTIME_TYPE", "docker");
            appMasterEnv.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", this.dockerImage);
            appMasterEnv.put("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", this.dockerMounts);
        }
        Utils.setupYarnClassPath((org.apache.hadoop.conf.Configuration)this.yarnConfiguration, appMasterEnv);
        amContainer.setEnvironment(appMasterEnv);
        Resource capability = (Resource)Records.newRecord(Resource.class);
        capability.setMemory(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(this.flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
        String customApplicationName = this.customName != null ? this.customName : applicationName;
        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(this.applicationType != null ? this.applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);
        int priorityNum = this.flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance((int)priorityNum);
            appContext.setPriority(priority);
        }
        if (this.yarnQueue != null) {
            appContext.setQueue(this.yarnQueue);
        }
        this.setApplicationNodeLabel(appContext);
        this.setApplicationTags(appContext);
        DeploymentFailureHook deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);
        LOG.info("Waiting for the cluster to be allocated");
        long startTime = System.currentTimeMillis();
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        block40: while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            }
            catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", (Object)appState);
            switch (appState) {
                case FAILED: 
                case KILLED: {
                    throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + appState + " during deployment. \nDiagnostics from YARN: " + report.getDiagnostics() + "\nIf log aggregation is enabled on your cluster, use this command to further investigate the issue:\nyarn logs -applicationId " + appId);
                }
                case RUNNING: {
                    LOG.info("YARN application has been deployed successfully.");
                    break block40;
                }
                case FINISHED: {
                    LOG.info("YARN application has been finished successfully.");
                    break block40;
                }
                default: {
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000L) {
                        LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
                    lastAppState = appState;
                    Thread.sleep(250L);
                    continue block40;
                }
            }
            break;
        }
        ShutdownHookUtil.removeShutdownHook((Thread)deploymentFailureHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        return report;
    }

    private org.apache.hadoop.fs.Path getYarnFilesDir(ApplicationId appId) throws IOException {
        FileSystem fileSystem = FileSystem.get((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        org.apache.hadoop.fs.Path homeDir = this.stagingDir != null ? this.stagingDir : fileSystem.getHomeDirectory();
        return new org.apache.hadoop.fs.Path(homeDir, ".flink/" + appId + '/');
    }

    private static org.apache.hadoop.fs.Path setupSingleLocalResource(String key, FileSystem fs, ApplicationId appId, org.apache.hadoop.fs.Path localSrcPath, Map<String, LocalResource> localResources, org.apache.hadoop.fs.Path targetHomeDir, String relativeTargetPath) throws IOException {
        Tuple2<org.apache.hadoop.fs.Path, LocalResource> resource = Utils.setupLocalResource(fs, appId.toString(), localSrcPath, targetHomeDir, relativeTargetPath);
        localResources.put(key, (LocalResource)resource.f1);
        return (org.apache.hadoop.fs.Path)resource.f0;
    }

    private static boolean isDistJar(String fileName) {
        return fileName.startsWith("flink-dist") && fileName.endsWith("jar");
    }

    static List<String> uploadAndRegisterFiles(Collection<File> shipFiles, FileSystem fs, org.apache.hadoop.fs.Path targetHomeDir, ApplicationId appId, List<org.apache.hadoop.fs.Path> remotePaths, Map<String, LocalResource> localResources, final String localResourcesDirectory, StringBuilder envShipFileList) throws IOException {
        final ArrayList<org.apache.hadoop.fs.Path> localPaths = new ArrayList<org.apache.hadoop.fs.Path>();
        final ArrayList<org.apache.hadoop.fs.Path> relativePaths = new ArrayList<org.apache.hadoop.fs.Path>();
        for (File shipFile : shipFiles) {
            if (shipFile.isDirectory()) {
                java.nio.file.Path shipPath = shipFile.toPath();
                final java.nio.file.Path parentPath = shipPath.getParent();
                Files.walkFileTree(shipPath, (FileVisitor<? super java.nio.file.Path>)new SimpleFileVisitor<java.nio.file.Path>(){

                    @Override
                    public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) {
                        localPaths.add(new org.apache.hadoop.fs.Path(file.toUri()));
                        relativePaths.add(new org.apache.hadoop.fs.Path(localResourcesDirectory, parentPath.relativize(file).toString()));
                        return FileVisitResult.CONTINUE;
                    }
                });
                continue;
            }
            localPaths.add(new org.apache.hadoop.fs.Path(shipFile.toURI()));
            relativePaths.add(new org.apache.hadoop.fs.Path(localResourcesDirectory, shipFile.getName()));
        }
        HashSet<String> archives = new HashSet<String>();
        HashSet<String> resources = new HashSet<String>();
        for (int i = 0; i < localPaths.size(); ++i) {
            org.apache.hadoop.fs.Path localPath = (org.apache.hadoop.fs.Path)localPaths.get(i);
            org.apache.hadoop.fs.Path relativePath = (org.apache.hadoop.fs.Path)relativePaths.get(i);
            if (YarnClusterDescriptor.isDistJar(relativePath.getName())) continue;
            String key = relativePath.toString();
            org.apache.hadoop.fs.Path remotePath = YarnClusterDescriptor.setupSingleLocalResource(key, fs, appId, localPath, localResources, targetHomeDir, relativePath.getParent().toString());
            remotePaths.add(remotePath);
            envShipFileList.append(key).append("=").append(remotePath).append(",");
            if (key.endsWith("jar")) {
                archives.add(relativePath.toString());
                continue;
            }
            resources.add(relativePath.getParent().toString());
        }
        ArrayList<String> classPaths = new ArrayList<String>();
        resources.stream().sorted().forEach(classPaths::add);
        archives.stream().sorted().forEach(classPaths::add);
        return classPaths;
    }

    private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
        LOG.info("Killing YARN application");
        try {
            yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
        }
        catch (Exception e) {
            LOG.debug("Error while killing YARN application", (Throwable)e);
        }
    }

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
        List nodes = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
        int totalFreeMemory = 0;
        int containerLimit = 0;
        int[] nodeManagersFree = new int[nodes.size()];
        for (int i = 0; i < nodes.size(); ++i) {
            int free;
            NodeReport rep = (NodeReport)nodes.get(i);
            nodeManagersFree[i] = free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
            totalFreeMemory += free;
            if (free <= containerLimit) continue;
            containerLimit = free;
        }
        return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
    }

    public String getClusterDescription() {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            YarnClusterMetrics metrics = this.yarnClient.getYarnClusterMetrics();
            ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
            List nodes = this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
            String format = "|%-16s |%-16s %n";
            ps.printf("|Property         |Value          %n", new Object[0]);
            ps.println("+---------------------------------------+");
            int totalMemory = 0;
            int totalCores = 0;
            for (NodeReport rep : nodes) {
                Resource res = rep.getCapability();
                totalMemory += res.getMemory();
                totalCores += res.getVirtualCores();
                ps.format("|%-16s |%-16s %n", "NodeID", rep.getNodeId());
                ps.format("|%-16s |%-16s %n", "Memory", res.getMemory() + " MB");
                ps.format("|%-16s |%-16s %n", "vCores", res.getVirtualCores());
                ps.format("|%-16s |%-16s %n", "HealthReport", rep.getHealthReport());
                ps.format("|%-16s |%-16s %n", "Containers", rep.getNumContainers());
                ps.println("+---------------------------------------+");
            }
            ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
            List qInfo = this.yarnClient.getAllQueues();
            for (QueueInfo q : qInfo) {
                ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
            }
            return baos.toString();
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't get cluster description", e);
        }
    }

    private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
        reflector.setAttemptFailuresValidityInterval(appContext, this.flinkConfiguration.getLong(YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL));
    }

    private void setApplicationTags(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        String tagsString = this.flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
        HashSet<String> applicationTags = new HashSet<String>();
        for (String tag : tagsString.split(",")) {
            String trimmedTag = tag.trim();
            if (trimmedTag.isEmpty()) continue;
            applicationTags.add(trimmedTag);
        }
        reflector.setApplicationTags(appContext, applicationTags);
    }

    private void setApplicationNodeLabel(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        if (this.nodeLabel != null) {
            ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
            reflector.setApplicationNodeLabel(appContext, this.nodeLabel);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @VisibleForTesting
    void addLibFoldersToShipFiles(Collection<File> effectiveShipFiles) {
        String libDir = System.getenv().get("FLINK_LIB_DIR");
        if (libDir != null) {
            File directoryFile = new File(libDir);
            if (!directoryFile.isDirectory()) throw new YarnDeploymentException("The environment variable 'FLINK_LIB_DIR' is set to '" + libDir + "' but the directory doesn't exist.");
            effectiveShipFiles.add(directoryFile);
            return;
        } else {
            if (!this.shipFiles.isEmpty()) return;
            LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. Not shipping any library files.", (Object)"FLINK_LIB_DIR");
        }
    }

    @VisibleForTesting
    void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {
        Optional pluginsDir = PluginConfig.getPluginsDir();
        pluginsDir.ifPresent(effectiveShipFiles::add);
    }

    ContainerLaunchContext setupApplicationMasterContainer(String yarnClusterEntrypoint, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, int jobManagerMemoryMb) {
        String javaOpts = this.flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
        if (this.flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
            javaOpts = javaOpts + " " + this.flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
        }
        if (hasKrb5) {
            javaOpts = javaOpts + " -Djava.security.krb5.conf=krb5.conf";
        }
        ContainerLaunchContext amContainer = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        HashMap<String, String> startCommandValues = new HashMap<String, String>();
        startCommandValues.put("java", "$JAVA_HOME/bin/java");
        int heapSize = BootstrapTools.calculateHeapSize((int)jobManagerMemoryMb, (Configuration)this.flinkConfiguration);
        String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize);
        startCommandValues.put("jvmmem", jvmHeapMem);
        startCommandValues.put("jvmopts", javaOpts);
        String logging = "";
        if (hasLogback || hasLog4j) {
            logging = "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"";
            if (hasLogback) {
                logging = logging + " -Dlogback.configurationFile=file:logback.xml";
            }
            if (hasLog4j) {
                logging = logging + " -Dlog4j.configuration=file:log4j.properties";
            }
        }
        startCommandValues.put("logging", logging);
        startCommandValues.put("class", yarnClusterEntrypoint);
        startCommandValues.put("redirects", "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err");
        startCommandValues.put("args", "");
        String commandTemplate = this.flinkConfiguration.getString("yarn.container-start-command-template", "%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%");
        String amCommand = BootstrapTools.getStartCommand((String)commandTemplate, startCommandValues);
        amContainer.setCommands(Collections.singletonList(amCommand));
        LOG.debug("Application Master start command: " + amCommand);
        return amContainer;
    }

    private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(Configuration config) {
        return (YarnConfigOptions.UserJarInclusion)config.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
    }

    private static boolean isUsrLibDirIncludedInShipFiles(List<File> shipFiles) {
        return shipFiles.stream().filter(File::isDirectory).map(File::getName).noneMatch(name -> name.equals("usrlib"));
    }

    private void setClusterEntrypointInfoToConfig(ApplicationReport report) {
        Preconditions.checkNotNull((Object)report);
        ApplicationId clusterId = report.getApplicationId();
        String host = report.getHost();
        int port = report.getRpcPort();
        LOG.info("Found Web Interface {}:{} of application '{}'.", new Object[]{host, port, clusterId});
        this.flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
        this.flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
        this.flinkConfiguration.setString(RestOptions.ADDRESS, host);
        this.flinkConfiguration.setInteger(RestOptions.PORT, port);
        this.flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, (Object)ConverterUtils.toString((ApplicationId)clusterId));
    }

    public static void logDetachedClusterInformation(ApplicationId yarnApplicationId, Logger logger) {
        logger.info("The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:\n$ echo \"stop\" | ./bin/yarn-session.sh -id {}\nIf this should not be possible, then you can also kill Flink via YARN's web interface or via:\n$ yarn application -kill {}\nNote that killing Flink might not clean up all job artifacts and temporary files.", (Object)yarnApplicationId, (Object)yarnApplicationId);
    }

    private class DeploymentFailureHook
    extends Thread {
        private final YarnClient yarnClient;
        private final YarnClientApplication yarnApplication;
        private final org.apache.hadoop.fs.Path yarnFilesDir;

        DeploymentFailureHook(YarnClientApplication yarnApplication, org.apache.hadoop.fs.Path yarnFilesDir) {
            this.yarnApplication = (YarnClientApplication)Preconditions.checkNotNull((Object)yarnApplication);
            this.yarnFilesDir = (org.apache.hadoop.fs.Path)Preconditions.checkNotNull((Object)yarnFilesDir);
            this.yarnClient = YarnClient.createYarnClient();
            this.yarnClient.init((org.apache.hadoop.conf.Configuration)YarnClusterDescriptor.this.yarnConfiguration);
        }

        @Override
        public void run() {
            LOG.info("Cancelling deployment from Deployment Failure Hook");
            this.yarnClient.start();
            YarnClusterDescriptor.this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            this.yarnClient.stop();
            LOG.info("Deleting files in {}.", (Object)this.yarnFilesDir);
            try {
                FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)YarnClusterDescriptor.this.yarnConfiguration);
                if (!fs.delete(this.yarnFilesDir, true)) {
                    throw new IOException("Deleting files in " + this.yarnFilesDir + " was unsuccessful");
                }
                fs.close();
            }
            catch (IOException e) {
                LOG.error("Failed to delete Flink Jar and configuration files in HDFS", (Throwable)e);
            }
        }
    }

    private static class YarnDeploymentException
    extends RuntimeException {
        private static final long serialVersionUID = -812040641215388943L;

        public YarnDeploymentException(String message) {
            super(message);
        }

        public YarnDeploymentException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static class ApplicationSubmissionContextReflector {
        private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
        private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
        private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
        private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval";
        private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts";
        private static final String NODE_LABEL_EXPRESSION_NAME = "setNodeLabelExpression";
        private final Method applicationTagsMethod;
        private final Method attemptFailuresValidityIntervalMethod;
        private final Method keepContainersMethod;
        @Nullable
        private final Method nodeLabelExpressionMethod;

        public static ApplicationSubmissionContextReflector getInstance() {
            return instance;
        }

        private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
            Method nodeLabelExpressionMethod;
            Method keepContainersMethod;
            Method attemptFailuresValidityIntervalMethod;
            Method applicationTagsMethod;
            try {
                applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
                applicationTagsMethod = null;
            }
            this.applicationTagsMethod = applicationTagsMethod;
            try {
                attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, Long.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
                attemptFailuresValidityIntervalMethod = null;
            }
            this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
            try {
                keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, Boolean.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
                keepContainersMethod = null;
            }
            this.keepContainersMethod = keepContainersMethod;
            try {
                nodeLabelExpressionMethod = clazz.getMethod(NODE_LABEL_EXPRESSION_NAME, String.class);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
                nodeLabelExpressionMethod = null;
            }
            this.nodeLabelExpressionMethod = nodeLabelExpressionMethod;
        }

        public void setApplicationTags(ApplicationSubmissionContext appContext, Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException {
            if (this.applicationTagsMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.applicationTagsMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.applicationTagsMethod.invoke((Object)appContext, applicationTags);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
            }
        }

        public void setApplicationNodeLabel(ApplicationSubmissionContext appContext, String nodeLabel) throws InvocationTargetException, IllegalAccessException {
            if (this.nodeLabelExpressionMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.nodeLabelExpressionMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.nodeLabelExpressionMethod.invoke((Object)appContext, nodeLabel);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
            }
        }

        public void setAttemptFailuresValidityInterval(ApplicationSubmissionContext appContext, long validityInterval) throws InvocationTargetException, IllegalAccessException {
            if (this.attemptFailuresValidityIntervalMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.attemptFailuresValidityIntervalMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.attemptFailuresValidityIntervalMethod.invoke((Object)appContext, validityInterval);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
            }
        }

        public void setKeepContainersAcrossApplicationAttempts(ApplicationSubmissionContext appContext, boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
            if (this.keepContainersMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.keepContainersMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.keepContainersMethod.invoke((Object)appContext, keepContainers);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
            }
        }
    }

    private static class ClusterResourceDescription {
        public final int totalFreeMemory;
        public final int containerLimit;
        public final int[] nodeManagersFree;

        public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
            this.totalFreeMemory = totalFreeMemory;
            this.containerLimit = containerLimit;
            this.nodeManagersFree = nodeManagersFree;
        }
    }
}

