/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FlinkYarnSessionCliTest
extends TestLogger {
    private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance((long)System.currentTimeMillis(), (int)42);
    private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance((long)System.currentTimeMillis(), (int)43);
    private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
    private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
    private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;
    private static final String invalidPropertiesFile = "jasfobManager=22.33.44.55:asf6655";
    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void testDynamicProperties() throws Exception {
        FlinkYarnSessionCli cli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "", "", false);
        Options options = new Options();
        cli.addGeneralOptions(options);
        cli.addRunOptions(options);
        DefaultParser parser = new DefaultParser();
        CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-D", AkkaOptions.ASK_TIMEOUT_DURATION.key() + "=5 min", "-D", CoreOptions.FLINK_JVM_OPTIONS.key() + "=-DappName=foobar", "-D", SecurityOptions.SSL_INTERNAL_KEY_PASSWORD.key() + "=changeit"});
        Configuration executorConfig = cli.toConfiguration(cmd);
        Assert.assertEquals((Object)Duration.ofMinutes(5L), (Object)executorConfig.get(AkkaOptions.ASK_TIMEOUT_DURATION));
        Assert.assertEquals((Object)"-DappName=foobar", (Object)executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS));
        Assert.assertEquals((Object)"changeit", (Object)executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD));
    }

    @Test
    public void testCorrectSettingOfMaxSlots() throws Exception {
        String[] params = new String[]{"-ys", "3"};
        Configuration configuration = this.createConfigurationWithJmAndTmTotalMemory(2048);
        FlinkYarnSessionCli yarnCLI = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
        configuration.addAll(yarnCLI.toConfiguration(commandLine));
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(configuration);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration);
        Assert.assertEquals((long)3L, (long)clusterSpecification.getSlotsPerTaskManager());
    }

    @Test
    public void testCorrectSettingOfDetachedMode() throws Exception {
        String[] params = new String[]{"-yd"};
        FlinkYarnSessionCli yarnCLI = this.createFlinkYarnSessionCli();
        CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
        Configuration executorConfig = yarnCLI.toConfiguration(commandLine);
        Assert.assertThat((Object)executorConfig.get(DeploymentOptions.ATTACHED), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testZookeeperNamespaceProperty() throws Exception {
        String zkNamespaceCliInput = "flink_test_namespace";
        String[] params = new String[]{"-yz", zkNamespaceCliInput};
        FlinkYarnSessionCli yarnCLI = this.createFlinkYarnSessionCli();
        CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
        Configuration executorConfig = yarnCLI.toConfiguration(commandLine);
        Assert.assertThat((Object)executorConfig.get(HighAvailabilityOptions.HA_CLUSTER_ID), (Matcher)Matchers.is((Object)zkNamespaceCliInput));
    }

    @Test
    public void testNodeLabelProperty() throws Exception {
        String nodeLabelCliInput = "flink_test_nodelabel";
        String[] params = new String[]{"-ynl", nodeLabelCliInput};
        FlinkYarnSessionCli yarnCLI = this.createFlinkYarnSessionCli();
        CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
        Configuration executorConfig = yarnCLI.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        YarnClusterDescriptor descriptor = (YarnClusterDescriptor)clientFactory.createClusterDescriptor(executorConfig);
        Assert.assertEquals((Object)nodeLabelCliInput, (Object)descriptor.getNodeLabel());
    }

    @Test
    public void testExecutorCLIisPrioritised() throws Exception {
        File directoryPath = this.writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
        this.validateYarnCLIisActive(configuration);
        String[] argsUnderTest = new String[]{"-e", YarnJobClusterExecutor.NAME};
        this.validateExecutorCLIisPrioritised(configuration, argsUnderTest);
    }

    private void validateExecutorCLIisPrioritised(Configuration configuration, String[] argsUnderTest) throws IOException, CliArgsException {
        List customCommandLines = CliFrontend.loadCustomCommandLines((Configuration)configuration, (String)this.tmp.newFile().getAbsolutePath());
        CliFrontend cli = new CliFrontend(configuration, customCommandLines);
        CommandLine commandLine = cli.getCommandLine(CliFrontendParser.getRunCommandOptions(), argsUnderTest, true);
        CustomCommandLine customCommandLine = cli.validateAndGetActiveCommandLine(commandLine);
        Assert.assertTrue((boolean)(customCommandLine instanceof GenericCLI));
    }

    private void validateYarnCLIisActive(Configuration configuration) throws FlinkException, CliArgsException {
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine testCLIArgs = flinkYarnSessionCli.parseCommandLineOptions(new String[0], true);
        Assert.assertTrue((boolean)flinkYarnSessionCli.isActive(testCLIArgs));
    }

    @Test
    public void testResumeFromYarnPropertiesFile() throws Exception {
        File directoryPath = this.writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], true);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ApplicationId clusterId = (ApplicationId)clientFactory.getClusterId(executorConfig);
        Assert.assertEquals((Object)TEST_YARN_APPLICATION_ID, (Object)clusterId);
    }

    @Test(expected=FlinkException.class)
    public void testInvalidYarnPropertiesFile() throws Exception {
        File directoryPath = this.writeYarnPropertiesFile(invalidPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
        this.createFlinkYarnSessionCli(configuration);
    }

    @Test
    public void testResumeFromYarnID() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ApplicationId clusterId = (ApplicationId)clientFactory.getClusterId(executorConfig);
        Assert.assertEquals((Object)TEST_YARN_APPLICATION_ID, (Object)clusterId);
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor)clientFactory.createClusterDescriptor(executorConfig);
        Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
        String zkNs = clusterDescriptorConfiguration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
        Assert.assertTrue((boolean)zkNs.matches("application_\\d+_0042"));
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        String overrideZkNamespace = "my_cluster";
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", "my_cluster"}, true);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor)clientFactory.createClusterDescriptor(executorConfig);
        Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
        String clusterId = clusterDescriptorConfiguration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
        Assert.assertEquals((Object)"my_cluster", (Object)clusterId);
    }

    @Test
    public void testYarnIDOverridesPropertiesFile() throws Exception {
        File directoryPath = this.writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID_2.toString()}, true);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ApplicationId clusterId = (ApplicationId)clientFactory.getClusterId(executorConfig);
        Assert.assertEquals((Object)TEST_YARN_APPLICATION_ID_2, (Object)clusterId);
    }

    @Test
    public void testCommandLineClusterSpecification() throws Exception {
        Configuration configuration = new Configuration();
        int jobManagerMemory = 1337;
        int taskManagerMemory = 7331;
        int slotsPerTaskManager = 30;
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.ofMebiBytes((long)1337L));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.ofMebiBytes((long)7331L));
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 30);
        String[] args = new String[]{"-yjm", String.valueOf(1337) + "m", "-ytm", String.valueOf(7331) + "m", "-ys", String.valueOf(30)};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)1337));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)7331));
        Assert.assertThat((Object)clusterSpecification.getSlotsPerTaskManager(), (Matcher)Matchers.is((Object)30));
    }

    @Test
    public void testConfigurationClusterSpecification() throws Exception {
        Configuration configuration = new Configuration();
        int jobManagerMemory = 1337;
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.ofMebiBytes((long)1337L));
        int taskManagerMemory = 7331;
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.ofMebiBytes((long)7331L));
        int slotsPerTaskManager = 42;
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 42);
        String[] args = new String[]{};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        configuration.addAll(flinkYarnSessionCli.toConfiguration(commandLine));
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(configuration);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)1337));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)7331));
        Assert.assertThat((Object)clusterSpecification.getSlotsPerTaskManager(), (Matcher)Matchers.is((Object)42));
    }

    @Test
    public void testMemoryPropertyWithoutUnit() throws Exception {
        String[] args = new String[]{"-yjm", "1024", "-ytm", "2048"};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)1024));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)2048));
    }

    @Test
    public void testMemoryPropertyWithUnitMB() throws Exception {
        String[] args = new String[]{"-yjm", "1024m", "-ytm", "2048m"};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)1024));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)2048));
    }

    @Test
    public void testMemoryPropertyWithArbitraryUnit() throws Exception {
        String[] args = new String[]{"-yjm", "1g", "-ytm", "2g"};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)1024));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)2048));
    }

    @Test
    public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
        configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
        configuration.addAll(flinkYarnSessionCli.toConfiguration(commandLine));
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(configuration);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)2048));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)4096));
    }

    @Test
    public void testJobManagerMemoryPropertyWithConfigDefaultValue() throws Exception {
        int procMemory = 2048;
        Configuration configuration = this.createConfigurationWithJmAndTmTotalMemory(procMemory);
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli(configuration);
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
        configuration.addAll(flinkYarnSessionCli.toConfiguration(commandLine));
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(configuration);
        ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration);
        Assert.assertThat((Object)clusterSpecification.getMasterMemoryMB(), (Matcher)Matchers.is((Object)procMemory));
        Assert.assertThat((Object)clusterSpecification.getTaskManagerMemoryMB(), (Matcher)Matchers.is((Object)procMemory));
    }

    @Test
    public void testMultipleYarnShipOptions() throws Exception {
        String[] args = new String[]{"run", "--yarnship", this.tmp.newFolder().getAbsolutePath(), "--yarnship", this.tmp.newFolder().getAbsolutePath()};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        YarnClusterDescriptor flinkYarnDescriptor = (YarnClusterDescriptor)clientFactory.createClusterDescriptor(executorConfig);
        Assert.assertEquals((long)2L, (long)flinkYarnDescriptor.getShipFiles().size());
    }

    @Test
    public void testShipFiles() throws Exception {
        File tmpFile = this.tmp.newFile();
        String[] args = new String[]{"run", "--yarnship", tmpFile.toString()};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        Configuration executorConfig = flinkYarnSessionCli.toConfiguration(commandLine);
        ClusterClientFactory<ApplicationId> clientFactory = this.getClusterClientFactory(executorConfig);
        YarnClusterDescriptor flinkYarnDescriptor = (YarnClusterDescriptor)clientFactory.createClusterDescriptor(executorConfig);
        Assert.assertEquals((Object)Lists.newArrayList((Object[])new File[]{tmpFile}), (Object)flinkYarnDescriptor.getShipFiles());
    }

    @Test
    public void testMissingShipFiles() throws Exception {
        File tmpFile = this.tmp.newFile();
        String[] args = new String[]{"run", "--yarnship", tmpFile.toString(), "--yarnship", "missing.file"};
        FlinkYarnSessionCli flinkYarnSessionCli = this.createFlinkYarnSessionCli();
        CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
        try {
            flinkYarnSessionCli.toConfiguration(commandLine);
            Assert.fail((String)"Expected error for missing file");
        }
        catch (ConfigurationException ce) {
            Assert.assertEquals((Object)"Ship file missing.file does not exist", (Object)ce.getMessage());
        }
    }

    private ClusterClientFactory<ApplicationId> getClusterClientFactory(Configuration executorConfig) {
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        return clusterClientServiceLoader.getClusterClientFactory(executorConfig);
    }

    private File writeYarnPropertiesFile(String contents) throws IOException {
        File tmpFolder = this.tmp.newFolder();
        String currentUser = System.getProperty("user.name");
        File testPropertiesFile = new File(tmpFolder, ".yarn-properties-" + currentUser);
        Files.write(testPropertiesFile.toPath(), contents.getBytes(), StandardOpenOption.CREATE);
        return tmpFolder.getAbsoluteFile();
    }

    private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException {
        return this.createFlinkYarnSessionCli(new Configuration());
    }

    private FlinkYarnSessionCli createFlinkYarnSessionCliWithJmAndTmTotalMemory(int totalMemomory) throws FlinkException {
        return this.createFlinkYarnSessionCli(this.createConfigurationWithJmAndTmTotalMemory(totalMemomory));
    }

    private Configuration createConfigurationWithJmAndTmTotalMemory(int totalMemomory) throws FlinkException {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.ofMebiBytes((long)totalMemomory));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.ofMebiBytes((long)totalMemomory));
        return configuration;
    }

    private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration configuration) throws FlinkException {
        return new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
    }
}

