/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn.cli;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.cli.YarnApplicationStatusMonitor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkYarnSessionCli
extends AbstractCustomCommandLine<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
    public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
    public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
    private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
    private static final String ID = "yarn-cluster";
    private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
    private static final String YARN_APPLICATION_ID_KEY = "applicationID";
    private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
    private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
    private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
    private static final String YARN_SESSION_HELP = "Available commands:\nhelp - show these commands\nstop - stop the YARN session";
    private final Option query;
    private final Option applicationId;
    private final Option queue;
    private final Option shipPath;
    private final Option flinkJar;
    private final Option jmMemory;
    private final Option tmMemory;
    private final Option container;
    private final Option slots;
    private final Option zookeeperNamespace;
    private final Option nodeLabel;
    private final Option help;
    @Deprecated
    private final Option streaming;
    private final Option name;
    private final Options allOptions;
    private final Option dynamicproperties;
    private final boolean acceptInteractiveInput;
    private final String configurationDirectory;
    private final Properties yarnPropertiesFile;
    private final ApplicationId yarnApplicationIdFromYarnProperties;
    private final String yarnPropertiesFileLocation;
    private final YarnConfiguration yarnConfiguration;

    public FlinkYarnSessionCli(Configuration configuration, String configurationDirectory, String shortPrefix, String longPrefix) throws FlinkException {
        this(configuration, configurationDirectory, shortPrefix, longPrefix, true);
    }

    public FlinkYarnSessionCli(Configuration configuration, String configurationDirectory, String shortPrefix, String longPrefix, boolean acceptInteractiveInput) throws FlinkException {
        super(configuration);
        this.configurationDirectory = (String)Preconditions.checkNotNull((Object)configurationDirectory);
        this.acceptInteractiveInput = acceptInteractiveInput;
        this.query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
        this.applicationId = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
        this.queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
        this.shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
        this.flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
        this.jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container with optional unit (default: MB)");
        this.tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container with optional unit (default: MB)");
        this.container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
        this.slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
        this.dynamicproperties = Option.builder((String)(shortPrefix + "D")).argName("property=value").numberOfArgs(2).valueSeparator().desc("use value for given property").build();
        this.streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
        this.name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
        this.zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
        this.nodeLabel = new Option(shortPrefix + "nl", longPrefix + "nodeLabel", true, "Specify YARN node label for the YARN application");
        this.help = new Option(shortPrefix + "h", longPrefix + "help", false, "Help for the Yarn session CLI.");
        this.allOptions = new Options();
        this.allOptions.addOption(this.flinkJar);
        this.allOptions.addOption(this.jmMemory);
        this.allOptions.addOption(this.tmMemory);
        this.allOptions.addOption(this.container);
        this.allOptions.addOption(this.queue);
        this.allOptions.addOption(this.query);
        this.allOptions.addOption(this.shipPath);
        this.allOptions.addOption(this.slots);
        this.allOptions.addOption(this.dynamicproperties);
        this.allOptions.addOption(CliFrontendParser.DETACHED_OPTION);
        this.allOptions.addOption(CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION);
        this.allOptions.addOption(CliFrontendParser.YARN_DETACHED_OPTION);
        this.allOptions.addOption(this.streaming);
        this.allOptions.addOption(this.name);
        this.allOptions.addOption(this.applicationId);
        this.allOptions.addOption(this.zookeeperNamespace);
        this.allOptions.addOption(this.nodeLabel);
        this.allOptions.addOption(this.help);
        this.yarnPropertiesFileLocation = configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
        File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(this.yarnPropertiesFileLocation);
        this.yarnPropertiesFile = new Properties();
        if (yarnPropertiesLocation.exists()) {
            LOG.info("Found Yarn properties file under {}.", (Object)yarnPropertiesLocation.getAbsolutePath());
            try (FileInputStream is = new FileInputStream(yarnPropertiesLocation);){
                this.yarnPropertiesFile.load(is);
            }
            catch (IOException ioe) {
                throw new FlinkException("Could not read the Yarn properties file " + yarnPropertiesLocation + ". Please delete the file at " + yarnPropertiesLocation.getAbsolutePath() + '.', (Throwable)ioe);
            }
            String yarnApplicationIdString = this.yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
            if (yarnApplicationIdString == null) {
                throw new FlinkException("Yarn properties file found but doesn't contain a Yarn application id. Please delete the file at " + yarnPropertiesLocation.getAbsolutePath());
            }
            try {
                this.yarnApplicationIdFromYarnProperties = ConverterUtils.toApplicationId((String)yarnApplicationIdString);
            }
            catch (Exception e) {
                throw new FlinkException("YARN properties contain an invalid entry for application id: " + yarnApplicationIdString + ". Please delete the file at " + yarnPropertiesLocation.getAbsolutePath(), (Throwable)e);
            }
        }
        this.yarnApplicationIdFromYarnProperties = null;
        this.yarnConfiguration = new YarnConfiguration();
    }

    private AbstractYarnClusterDescriptor createDescriptor(Configuration configuration, YarnConfiguration yarnConfiguration, String configurationDirectory, CommandLine cmd) {
        Object localJarPath;
        AbstractYarnClusterDescriptor yarnClusterDescriptor = this.getClusterDescriptor(configuration, yarnConfiguration, configurationDirectory);
        if (cmd.hasOption(this.flinkJar.getOpt())) {
            String userPath = cmd.getOptionValue(this.flinkJar.getOpt());
            if (!userPath.startsWith("file://")) {
                userPath = "file://" + userPath;
            }
            localJarPath = new Path(userPath);
        } else {
            String decodedPath;
            LOG.info("No path for the flink jar passed. Using the location of " + yarnClusterDescriptor.getClass() + " to locate the jar");
            String encodedJarPath = yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
            try {
                decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
            }
            catch (UnsupportedEncodingException e) {
                throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath + " Please supply a path manually via the -" + this.flinkJar.getOpt() + " option.");
            }
            localJarPath = decodedPath.endsWith(".jar") ? new Path(new File(decodedPath).toURI()) : null;
        }
        if (localJarPath != null) {
            yarnClusterDescriptor.setLocalJarPath((Path)localJarPath);
        }
        ArrayList<File> shipFiles = new ArrayList<File>();
        if (cmd.hasOption(this.shipPath.getOpt())) {
            String[] shipPaths;
            for (String shipPath : shipPaths = cmd.getOptionValues(this.shipPath.getOpt())) {
                File shipDir = new File(shipPath);
                if (shipDir.isDirectory()) {
                    shipFiles.add(shipDir);
                    continue;
                }
                LOG.warn("Ship directory {} is not a directory. Ignoring it.", (Object)shipDir.getAbsolutePath());
            }
        }
        yarnClusterDescriptor.addShipFiles(shipFiles);
        if (cmd.hasOption(this.queue.getOpt())) {
            yarnClusterDescriptor.setQueue(cmd.getOptionValue(this.queue.getOpt()));
        }
        Properties properties = cmd.getOptionProperties(this.dynamicproperties.getOpt());
        Object[] dynamicProperties = (String[])properties.stringPropertyNames().stream().flatMap(key -> {
            String value = properties.getProperty((String)key);
            if (value != null) {
                return Stream.of(key + this.dynamicproperties.getValueSeparator() + value);
            }
            return Stream.empty();
        }).toArray(String[]::new);
        String dynamicPropertiesEncoded = StringUtils.join((Object[])dynamicProperties, (String)YARN_DYNAMIC_PROPERTIES_SEPARATOR);
        yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
        if (cmd.hasOption(CliFrontendParser.YARN_DETACHED_OPTION.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
            yarnClusterDescriptor.setDetachedMode(true);
        }
        if (cmd.hasOption(this.name.getOpt())) {
            yarnClusterDescriptor.setName(cmd.getOptionValue(this.name.getOpt()));
        }
        if (cmd.hasOption(this.zookeeperNamespace.getOpt())) {
            String zookeeperNamespaceValue = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
            yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespaceValue);
        }
        if (cmd.hasOption(this.nodeLabel.getOpt())) {
            String nodeLabelValue = cmd.getOptionValue(this.nodeLabel.getOpt());
            yarnClusterDescriptor.setNodeLabel(nodeLabelValue);
        }
        return yarnClusterDescriptor;
    }

    private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
        if (cmd.hasOption(this.container.getOpt())) {
            LOG.info("The argument {} is deprecated in will be ignored.", (Object)this.container.getOpt());
        }
        int numberTaskManagers = cmd.hasOption(this.container.getOpt()) ? Integer.valueOf(cmd.getOptionValue(this.container.getOpt())) : 1;
        int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory((Configuration)configuration).getMebiBytes();
        int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory((Configuration)configuration).getMebiBytes();
        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
        return new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(jobManagerMemoryMB).setTaskManagerMemoryMB(taskManagerMemoryMB).setNumberTaskManagers(numberTaskManagers).setSlotsPerTaskManager(slotsPerTaskManager).createClusterSpecification();
    }

    private void printUsage() {
        System.out.println("Usage:");
        HelpFormatter formatter = new HelpFormatter();
        formatter.setWidth(200);
        formatter.setLeftPadding(5);
        formatter.setSyntaxPrefix("   Required");
        Options req = new Options();
        req.addOption(this.container);
        formatter.printHelp(" ", req);
        formatter.setSyntaxPrefix("   Optional");
        Options options = new Options();
        this.addGeneralOptions(options);
        this.addRunOptions(options);
        formatter.printHelp(" ", options);
    }

    public boolean isActive(CommandLine commandLine) {
        String jobManagerOption = commandLine.getOptionValue(this.addressOption.getOpt(), null);
        boolean yarnJobManager = ID.equals(jobManagerOption);
        boolean yarnAppId = commandLine.hasOption(this.applicationId.getOpt());
        return yarnJobManager || yarnAppId || this.isYarnPropertiesFileMode(commandLine) && this.yarnApplicationIdFromYarnProperties != null;
    }

    public String getId() {
        return ID;
    }

    public void addRunOptions(Options baseOptions) {
        super.addRunOptions(baseOptions);
        for (Object option : this.allOptions.getOptions()) {
            baseOptions.addOption((Option)option);
        }
    }

    public void addGeneralOptions(Options baseOptions) {
        super.addGeneralOptions(baseOptions);
        baseOptions.addOption(this.applicationId);
    }

    public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException {
        Configuration effectiveConfiguration = this.applyCommandLineOptionsToConfiguration(commandLine);
        return this.createDescriptor(effectiveConfiguration, this.yarnConfiguration, this.configurationDirectory, commandLine);
    }

    @Nullable
    public ApplicationId getClusterId(CommandLine commandLine) {
        if (commandLine.hasOption(this.applicationId.getOpt())) {
            return ConverterUtils.toApplicationId((String)commandLine.getOptionValue(this.applicationId.getOpt()));
        }
        if (this.isYarnPropertiesFileMode(commandLine)) {
            return this.yarnApplicationIdFromYarnProperties;
        }
        return null;
    }

    public ClusterSpecification getClusterSpecification(CommandLine commandLine) throws FlinkException {
        Configuration effectiveConfiguration = this.applyCommandLineOptionsToConfiguration(commandLine);
        return this.createClusterSpecification(effectiveConfiguration, commandLine);
    }

    protected Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
        ApplicationId applicationId;
        Configuration effectiveConfiguration = new Configuration(this.configuration);
        if (commandLine.hasOption(this.zookeeperNamespaceOption.getOpt())) {
            String zkNamespace = commandLine.getOptionValue(this.zookeeperNamespaceOption.getOpt());
            effectiveConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
        }
        if ((applicationId = this.getClusterId(commandLine)) != null) {
            String zooKeeperNamespace = commandLine.hasOption(this.zookeeperNamespace.getOpt()) ? commandLine.getOptionValue(this.zookeeperNamespace.getOpt()) : effectiveConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationId.toString());
            effectiveConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zooKeeperNamespace);
        }
        if (commandLine.hasOption(this.jmMemory.getOpt())) {
            String jmMemoryVal = commandLine.getOptionValue(this.jmMemory.getOpt());
            if (!MemorySize.MemoryUnit.hasUnit((String)jmMemoryVal)) {
                jmMemoryVal = jmMemoryVal + "m";
            }
            effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jmMemoryVal);
        }
        if (commandLine.hasOption(this.tmMemory.getOpt())) {
            String tmMemoryVal = commandLine.getOptionValue(this.tmMemory.getOpt());
            if (!MemorySize.MemoryUnit.hasUnit((String)tmMemoryVal)) {
                tmMemoryVal = tmMemoryVal + "m";
            }
            effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
        }
        if (commandLine.hasOption(this.slots.getOpt())) {
            effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(this.slots.getOpt())));
        }
        if (this.isYarnPropertiesFileMode(commandLine)) {
            return this.applyYarnProperties(effectiveConfiguration);
        }
        return effectiveConfiguration;
    }

    private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
        boolean canApplyYarnProperties;
        boolean bl = canApplyYarnProperties = !commandLine.hasOption(this.addressOption.getOpt());
        if (canApplyYarnProperties) {
            for (Option option : commandLine.getOptions()) {
                if (!this.allOptions.hasOption(option.getOpt()) || this.isDetachedOption(option)) continue;
                canApplyYarnProperties = false;
                break;
            }
        }
        return canApplyYarnProperties;
    }

    private boolean isDetachedOption(Option option) {
        return option.getOpt().equals(CliFrontendParser.YARN_DETACHED_OPTION.getOpt()) || option.getOpt().equals(CliFrontendParser.DETACHED_OPTION.getOpt());
    }

    private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
        Configuration effectiveConfiguration = new Configuration(configuration);
        String propParallelism = this.yarnPropertiesFile.getProperty(YARN_PROPERTIES_PARALLELISM);
        if (propParallelism != null) {
            try {
                int parallelism = Integer.parseInt(propParallelism);
                effectiveConfiguration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
                this.logAndSysout("YARN properties set default parallelism to " + parallelism);
            }
            catch (NumberFormatException e) {
                throw new FlinkException("Error while parsing the YARN properties: Property parallelism is not an integer.", (Throwable)e);
            }
        }
        String dynamicPropertiesEncoded = this.yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
        Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(dynamicPropertiesEncoded);
        for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
            effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
        }
        return effectiveConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args) throws CliArgsException, FlinkException {
        block25: {
            CommandLine cmd = this.parseCommandLineOptions(args, true);
            if (cmd.hasOption(this.help.getOpt())) {
                this.printUsage();
                return 0;
            }
            AbstractYarnClusterDescriptor yarnClusterDescriptor = this.createClusterDescriptor(cmd);
            try {
                ClusterClient<ApplicationId> clusterClient;
                ApplicationId yarnApplicationId;
                if (cmd.hasOption(this.query.getOpt())) {
                    String description = yarnClusterDescriptor.getClusterDescription();
                    System.out.println(description);
                    int n = 0;
                    return n;
                }
                if (cmd.hasOption(this.applicationId.getOpt())) {
                    yarnApplicationId = ConverterUtils.toApplicationId((String)cmd.getOptionValue(this.applicationId.getOpt()));
                    clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId);
                } else {
                    ClusterSpecification clusterSpecification = this.getClusterSpecification(cmd);
                    clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
                    yarnApplicationId = (ApplicationId)clusterClient.getClusterId();
                    try {
                        LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
                        System.out.println("Flink JobManager is now running on " + connectionInfo.getHostname() + ':' + connectionInfo.getPort() + " with leader id " + connectionInfo.getLeaderSessionID() + '.');
                        System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
                        this.writeYarnPropertiesFile(yarnApplicationId, clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(), yarnClusterDescriptor.getDynamicPropertiesEncoded());
                    }
                    catch (Exception e) {
                        try {
                            clusterClient.shutdown();
                        }
                        catch (Exception ex) {
                            LOG.info("Could not properly shutdown cluster client.", (Throwable)ex);
                        }
                        try {
                            yarnClusterDescriptor.killCluster(yarnApplicationId);
                        }
                        catch (FlinkException fe) {
                            LOG.info("Could not properly terminate the Flink cluster.", (Throwable)fe);
                        }
                        throw new FlinkException("Could not write the Yarn connection information.", (Throwable)e);
                    }
                }
                if (yarnClusterDescriptor.isDetachedMode()) {
                    LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + yarnApplicationId);
                    break block25;
                }
                ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
                YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(yarnClusterDescriptor.getYarnClient(), yarnApplicationId, (ScheduledExecutor)new ScheduledExecutorServiceAdapter(scheduledExecutorService));
                Thread shutdownHook = ShutdownHookUtil.addShutdownHook(() -> this.shutdownCluster(clusterClient, scheduledExecutorService, yarnApplicationStatusMonitor), (String)((Object)((Object)this)).getClass().getSimpleName(), (Logger)LOG);
                try {
                    FlinkYarnSessionCli.runInteractiveCli(clusterClient, yarnApplicationStatusMonitor, this.acceptInteractiveInput);
                }
                finally {
                    this.shutdownCluster(clusterClient, scheduledExecutorService, yarnApplicationStatusMonitor);
                    if (shutdownHook != null) {
                        ShutdownHookUtil.removeShutdownHook((Thread)shutdownHook, (String)((Object)((Object)this)).getClass().getSimpleName(), (Logger)LOG);
                    }
                    this.tryRetrieveAndLogApplicationReport(yarnClusterDescriptor.getYarnClient(), yarnApplicationId);
                }
            }
            finally {
                try {
                    yarnClusterDescriptor.close();
                }
                catch (Exception e) {
                    LOG.info("Could not properly close the yarn cluster descriptor.", (Throwable)e);
                }
            }
        }
        return 0;
    }

    private void shutdownCluster(ClusterClient clusterClient, ScheduledExecutorService scheduledExecutorService, YarnApplicationStatusMonitor yarnApplicationStatusMonitor) {
        try {
            yarnApplicationStatusMonitor.close();
        }
        catch (Exception e) {
            LOG.info("Could not properly close the Yarn application status monitor.", (Throwable)e);
        }
        clusterClient.shutDownCluster();
        try {
            clusterClient.shutdown();
        }
        catch (Exception e) {
            LOG.info("Could not properly shutdown cluster client.", (Throwable)e);
        }
        ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{scheduledExecutorService});
        this.deleteYarnPropertiesFile();
    }

    private void tryRetrieveAndLogApplicationReport(YarnClient yarnClient, ApplicationId yarnApplicationId) {
        ApplicationReport applicationReport;
        try {
            applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
        }
        catch (IOException | YarnException e) {
            LOG.info("Could not log the final application report.", e);
            applicationReport = null;
        }
        if (applicationReport != null) {
            this.logApplicationReport(applicationReport);
        }
    }

    private void logApplicationReport(ApplicationReport appReport) {
        LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport.getYarnApplicationState() + " and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
        if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED) {
            LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
            LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command:" + System.lineSeparator() + "\tyarn logs -applicationId " + appReport.getApplicationId() + System.lineSeparator() + "(It sometimes takes a few seconds until the logs are aggregated)");
        }
    }

    private void deleteYarnPropertiesFile() {
        try {
            File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(this.yarnPropertiesFileLocation);
            if (propertiesFile.isFile()) {
                if (propertiesFile.delete()) {
                    LOG.info("Deleted Yarn properties file at {}", (Object)propertiesFile.getAbsoluteFile());
                } else {
                    LOG.warn("Couldn't delete Yarn properties file at {}", (Object)propertiesFile.getAbsoluteFile());
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while deleting the JobManager address file", (Throwable)e);
        }
    }

    private void writeYarnPropertiesFile(ApplicationId yarnApplicationId, int parallelism, @Nullable String dynamicProperties) {
        File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(this.yarnPropertiesFileLocation);
        Properties yarnProps = new Properties();
        yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());
        if (parallelism > 0) {
            yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, Integer.toString(parallelism));
        }
        if (dynamicProperties != null) {
            yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
        }
        FlinkYarnSessionCli.writeYarnProperties(yarnProps, yarnPropertiesFile);
    }

    private void logAndSysout(String message) {
        LOG.info(message);
        System.out.println(message);
    }

    public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
        if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
            String[] propertyLines;
            HashMap<String, String> properties = new HashMap<String, String>();
            for (String propLine : propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR)) {
                int firstEquals;
                if (propLine == null || (firstEquals = propLine.indexOf("=")) < 0) continue;
                String key = propLine.substring(0, firstEquals).trim();
                String value = propLine.substring(firstEquals + 1, propLine.length()).trim();
                if (key.isEmpty()) continue;
                properties.put(key, value);
            }
            return properties;
        }
        return Collections.emptyMap();
    }

    public static void main(String[] args) {
        int retCode;
        String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
        try {
            FlinkYarnSessionCli cli = new FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "", "");
            SecurityUtils.install((SecurityConfiguration)new SecurityConfiguration(flinkConfiguration));
            retCode = (Integer)SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
        }
        catch (CliArgsException e) {
            retCode = FlinkYarnSessionCli.handleCliArgsException(e);
        }
        catch (Throwable t) {
            Throwable strippedThrowable = ExceptionUtils.stripException((Throwable)t, UndeclaredThrowableException.class);
            retCode = FlinkYarnSessionCli.handleError(strippedThrowable);
        }
        System.exit(retCode);
    }

    private static void runInteractiveCli(ClusterClient<?> clusterClient, YarnApplicationStatusMonitor yarnApplicationStatusMonitor, boolean readConsoleInput) {
        try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in));){
            boolean continueRepl = true;
            int numTaskmanagers = 0;
            boolean isLastStatusUnknown = true;
            long unknownStatusSince = System.nanoTime();
            while (continueRepl) {
                ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();
                switch (applicationStatus) {
                    case FAILED: 
                    case CANCELED: {
                        System.err.println("The Flink Yarn cluster has failed.");
                        continueRepl = false;
                        break;
                    }
                    case UNKNOWN: {
                        if (!isLastStatusUnknown) {
                            unknownStatusSince = System.nanoTime();
                            isLastStatusUnknown = true;
                        }
                        if (System.nanoTime() - unknownStatusSince > 15000000000L) {
                            System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
                            continueRepl = false;
                            break;
                        }
                        continueRepl = FlinkYarnSessionCli.repStep(in, readConsoleInput);
                        break;
                    }
                    case SUCCEEDED: {
                        if (isLastStatusUnknown) {
                            isLastStatusUnknown = false;
                        }
                        try {
                            GetClusterStatusResponse status = clusterClient.getClusterStatus();
                            if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
                                System.err.println("Number of connected TaskManagers changed to " + status.numRegisteredTaskManagers() + ". Slots available: " + status.totalNumberOfSlots());
                                numTaskmanagers = status.numRegisteredTaskManagers();
                            }
                        }
                        catch (Exception e) {
                            LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", (Throwable)e);
                        }
                        FlinkYarnSessionCli.printClusterMessages(clusterClient);
                        continueRepl = FlinkYarnSessionCli.repStep(in, readConsoleInput);
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while running the interactive command line interface.", (Throwable)e);
        }
    }

    private static void printClusterMessages(ClusterClient clusterClient) {
        List messages = clusterClient.getNewMessages();
        if (!messages.isEmpty()) {
            System.err.println("New messages from the YARN cluster: ");
            for (String msg : messages) {
                System.err.println(msg);
            }
        }
    }

    private static boolean repStep(BufferedReader in, boolean readConsoleInput) throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();
        while (!(System.currentTimeMillis() - startTime >= 3000L || readConsoleInput && in.ready())) {
            Thread.sleep(200L);
        }
        if (readConsoleInput && in.ready()) {
            String command;
            switch (command = in.readLine()) {
                case "quit": 
                case "stop": {
                    return false;
                }
                case "help": {
                    System.err.println(YARN_SESSION_HELP);
                    break;
                }
                default: {
                    System.err.println("Unknown command '" + command + "'. Showing help:");
                    System.err.println(YARN_SESSION_HELP);
                }
            }
        }
        return true;
    }

    private static void writeYarnProperties(Properties properties, File propertiesFile) {
        try (FileOutputStream out = new FileOutputStream(propertiesFile);){
            properties.store(out, "Generated YARN properties file");
        }
        catch (IOException e) {
            throw new RuntimeException("Error writing the properties file", e);
        }
        propertiesFile.setReadable(true, false);
    }

    private static int handleCliArgsException(CliArgsException e) {
        LOG.error("Could not parse the command line arguments.", (Throwable)e);
        System.out.println(e.getMessage());
        System.out.println();
        System.out.println("Use the help option (-h or --help) to get help on the command.");
        return 1;
    }

    private static int handleError(Throwable t) {
        LOG.error("Error while running the Flink Yarn session.", t);
        System.err.println();
        System.err.println("------------------------------------------------------------");
        System.err.println(" The program finished with the following exception:");
        System.err.println();
        t.printStackTrace();
        return 1;
    }

    public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFileLocation) {
        String propertiesFileLocation = yarnPropertiesFileLocation != null ? yarnPropertiesFileLocation : System.getProperty("java.io.tmpdir");
        String currentUser = System.getProperty("user.name");
        return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
    }

    private AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, YarnConfiguration yarnConfiguration, String configurationDirectory) {
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init((org.apache.hadoop.conf.Configuration)yarnConfiguration);
        yarnClient.start();
        return new YarnClusterDescriptor(configuration, yarnConfiguration, configurationDirectory, yarnClient, false);
    }
}

