/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptorBuilder;
import org.apache.flink.yarn.YarnTestUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class YarnClusterDescriptorTest
extends TestLogger {
    private static final int YARN_MAX_VCORES = 16;
    private static YarnConfiguration yarnConfiguration;
    private static YarnClient yarnClient;
    private final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setSlotsPerTaskManager(Integer.MAX_VALUE).createClusterSpecification();
    private final ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[0], null);
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private File flinkJar;

    @BeforeClass
    public static void setupClass() {
        yarnConfiguration = new YarnConfiguration();
        yarnClient = YarnClient.createYarnClient();
        yarnClient.init((org.apache.hadoop.conf.Configuration)yarnConfiguration);
        yarnClient.start();
    }

    @Before
    public void beforeTest() throws IOException {
        this.temporaryFolder.create();
        this.flinkJar = this.temporaryFolder.newFile("flink.jar");
    }

    @AfterClass
    public static void tearDownClass() {
        yarnClient.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
        Configuration flinkConfiguration = new Configuration();
        clusterDescriptor.setLocalJarPath(new Path(this.flinkJar.getPath()));
        try (YarnClusterDescriptor clusterDescriptor = this.createYarnClusterDescriptor(flinkConfiguration);){
            clusterDescriptor.deploySessionCluster(this.clusterSpecification);
            Assert.fail((String)"The deploy call should have failed.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConfigOverwrite() throws ClusterDeploymentException {
        Configuration configuration = new Configuration();
        configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
        YarnClusterDescriptor clusterDescriptor = this.createYarnClusterDescriptor(configuration);
        clusterDescriptor.setLocalJarPath(new Path(this.flinkJar.getPath()));
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
        try {
            clusterDescriptor.deploySessionCluster(clusterSpecification);
            Assert.fail((String)"The deploy call should have failed.");
        }
        catch (ClusterDeploymentException e) {
            if (!(e.getCause() instanceof IllegalConfigurationException)) {
                throw e;
            }
        }
        finally {
            clusterDescriptor.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSetupApplicationMasterContainer() {
        Configuration cfg = new Configuration();
        YarnClusterDescriptor clusterDescriptor = this.createYarnClusterDescriptor(cfg);
        JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.createDefaultJobManagerProcessSpec((int)1024);
        String java = "$JAVA_HOME/bin/java";
        String jvmmem = JobManagerProcessUtils.generateJvmParametersStr((JobManagerProcessSpec)jobManagerProcessSpec, (Configuration)cfg);
        String dynamicParameters = JobManagerProcessUtils.generateDynamicConfigsStr((JobManagerProcessSpec)jobManagerProcessSpec);
        String jvmOpts = "-Djvm";
        String jmJvmOpts = "-DjmJvm";
        String krb5 = "-Djava.security.krb5.conf=krb5.conf";
        String logfile = "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"";
        String logback = "-Dlogback.configurationFile=file:logback.xml";
        String log4j = "-Dlog4j.configuration=file:log4j.properties -Dlog4j.configurationFile=file:log4j.properties";
        String mainClass = clusterDescriptor.getYarnSessionClusterEntrypoint();
        String redirects = "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err";
        try {
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, false, jobManagerProcessSpec).getCommands().get(0));
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djava.security.krb5.conf=krb5.conf" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, false, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djava.security.krb5.conf=krb5.conf" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"log4j.properties");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlog4j.configuration=file:log4j.properties -Dlog4j.configurationFile=file:log4j.properties" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, false, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"log4j.properties");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djava.security.krb5.conf=krb5.conf" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlog4j.configuration=file:log4j.properties -Dlog4j.configurationFile=file:log4j.properties" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, false, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djava.security.krb5.conf=krb5.conf" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, "-Djvm");
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djvm" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, false, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djvm" + " " + "-Djava.security.krb5.conf=krb5.conf" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, "-DjmJvm");
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"log4j.properties");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djvm" + " " + "-DjmJvm" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlog4j.configuration=file:log4j.properties -Dlog4j.configurationFile=file:log4j.properties" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, false, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"log4j.properties");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java " + jvmmem + " " + "-Djvm" + " " + "-DjmJvm" + " " + "-Djava.security.krb5.conf=krb5.conf" + " " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlog4j.configuration=file:log4j.properties -Dlog4j.configurationFile=file:log4j.properties" + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            cfg.setString("yarn.container-start-command-template", "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java 1 " + jvmmem + " 2 " + "-Djvm" + " " + "-DjmJvm" + " " + "-Djava.security.krb5.conf=krb5.conf" + " 3 " + "-Dlog.file=\"<LOG_DIR>/jobmanager.log\"" + " " + "-Dlogback.configurationFile=file:logback.xml" + " 4 " + mainClass + " 5 " + dynamicParameters + " 6 " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
            cfg.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, (Object)"logback.xml");
            cfg.setString("yarn.container-start-command-template", "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
            Assert.assertEquals((Object)("$JAVA_HOME/bin/java -Dlog.file=\"<LOG_DIR>/jobmanager.log\" -Dlogback.configurationFile=file:logback.xml -Djvm -DjmJvm -Djava.security.krb5.conf=krb5.conf " + jvmmem + " " + mainClass + " " + dynamicParameters + " " + "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"), clusterDescriptor.setupApplicationMasterContainer(mainClass, true, jobManagerProcessSpec).getCommands().get(0));
        }
        finally {
            clusterDescriptor.close();
        }
    }

    @Test
    public void testExplicitFileShipping() throws Exception {
        try (YarnClusterDescriptor descriptor = this.createYarnClusterDescriptor();){
            descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
            File libFile = this.temporaryFolder.newFile("libFile.jar");
            File libFolder = this.temporaryFolder.newFolder().getAbsoluteFile();
            Assert.assertFalse((boolean)descriptor.getShipFiles().contains(libFile));
            Assert.assertFalse((boolean)descriptor.getShipFiles().contains(libFolder));
            ArrayList<File> shipFiles = new ArrayList<File>();
            shipFiles.add(libFile);
            shipFiles.add(libFolder);
            descriptor.addShipFiles(shipFiles);
            Assert.assertTrue((boolean)descriptor.getShipFiles().contains(libFile));
            Assert.assertTrue((boolean)descriptor.getShipFiles().contains(libFolder));
            HashSet effectiveShipFiles = new HashSet();
            descriptor.addLibFoldersToShipFiles(effectiveShipFiles);
            Assert.assertEquals((long)0L, (long)effectiveShipFiles.size());
            Assert.assertEquals((long)2L, (long)descriptor.getShipFiles().size());
            Assert.assertTrue((boolean)descriptor.getShipFiles().contains(libFile));
            Assert.assertTrue((boolean)descriptor.getShipFiles().contains(libFolder));
        }
    }

    @Test
    public void testEnvironmentLibShipping() throws Exception {
        this.testEnvironmentDirectoryShipping("FLINK_LIB_DIR", false);
    }

    @Test
    public void testEnvironmentPluginsShipping() throws Exception {
        this.testEnvironmentDirectoryShipping("FLINK_PLUGINS_DIR", true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testEnvironmentDirectoryShipping(String environmentVariable, boolean onlyShip) throws Exception {
        try (YarnClusterDescriptor descriptor = this.createYarnClusterDescriptor();){
            File libFolder = this.temporaryFolder.newFolder().getAbsoluteFile();
            File libFile = new File(libFolder, "libFile.jar");
            TestCase.assertTrue((boolean)libFile.createNewFile());
            HashSet effectiveShipFiles = new HashSet();
            Map<String, String> oldEnv = System.getenv();
            try {
                HashMap<String, String> env = new HashMap<String, String>(1);
                env.put(environmentVariable, libFolder.getAbsolutePath());
                CommonTestUtils.setEnv(env);
                if (onlyShip) {
                    descriptor.addPluginsFoldersToShipFiles(effectiveShipFiles);
                } else {
                    descriptor.addLibFoldersToShipFiles(effectiveShipFiles);
                }
            }
            finally {
                CommonTestUtils.setEnv(oldEnv);
            }
            Assert.assertFalse((boolean)effectiveShipFiles.contains(libFile));
            Assert.assertTrue((boolean)effectiveShipFiles.contains(libFolder));
            Assert.assertFalse((boolean)descriptor.getShipFiles().contains(libFile));
            Assert.assertFalse((boolean)descriptor.getShipFiles().contains(libFolder));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEnvironmentEmptyPluginsShipping() {
        try (YarnClusterDescriptor descriptor = this.createYarnClusterDescriptor();){
            File pluginsFolder = Paths.get(this.temporaryFolder.getRoot().getAbsolutePath(), "s0m3_p4th_th4t_sh0uld_n0t_3x1sts").toFile();
            HashSet effectiveShipFiles = new HashSet();
            Map<String, String> oldEnv = System.getenv();
            try {
                HashMap<String, String> env = new HashMap<String, String>(1);
                env.put("FLINK_PLUGINS_DIR", pluginsFolder.getAbsolutePath());
                CommonTestUtils.setEnv(env);
                descriptor.addPluginsFoldersToShipFiles(effectiveShipFiles);
            }
            finally {
                CommonTestUtils.setEnv(oldEnv);
            }
            TestCase.assertTrue((boolean)effectiveShipFiles.isEmpty());
        }
    }

    @Test
    public void testDisableSystemClassPathIncludeUserJarAndWithIllegalShipDirectoryName() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, (Object)YarnConfigOptions.UserJarInclusion.DISABLED);
        YarnClusterDescriptor yarnClusterDescriptor = this.createYarnClusterDescriptor(configuration);
        try {
            yarnClusterDescriptor.addShipFiles(Collections.singletonList(this.temporaryFolder.newFolder("usrlib")));
            Assert.fail();
        }
        catch (IllegalArgumentException exception) {
            Assert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.containsString((String)"This is an illegal ship directory :"));
        }
    }

    @Test
    public void testYarnClientShutDown() {
        YarnClusterDescriptor yarnClusterDescriptor = this.createYarnClusterDescriptor();
        yarnClusterDescriptor.close();
        TestCase.assertTrue((boolean)yarnClient.isInState(Service.STATE.STARTED));
        YarnClient closableYarnClient = YarnClient.createYarnClient();
        closableYarnClient.init((org.apache.hadoop.conf.Configuration)yarnConfiguration);
        closableYarnClient.start();
        yarnClusterDescriptor = YarnTestUtils.createClusterDescriptorWithLogging(this.temporaryFolder.getRoot().getAbsolutePath(), new Configuration(), yarnConfiguration, closableYarnClient, false);
        yarnClusterDescriptor.close();
        TestCase.assertTrue((boolean)closableYarnClient.isInState(Service.STATE.STOPPED));
    }

    @Test
    public void testDeployApplicationClusterWithDeploymentTargetNotCorrectlySet() {
        Configuration flinkConfig = new Configuration();
        flinkConfig.set(PipelineOptions.JARS, Collections.singletonList("file:///path/of/user.jar"));
        flinkConfig.set(DeploymentOptions.TARGET, (Object)YarnDeploymentTarget.SESSION.getName());
        try (YarnClusterDescriptor yarnClusterDescriptor = this.createYarnClusterDescriptor(flinkConfig);){
            CommonTestUtils.assertThrows((String)"Expected deployment.target=yarn-application", ClusterDeploymentException.class, () -> yarnClusterDescriptor.deployApplicationCluster(this.clusterSpecification, this.appConfig));
        }
    }

    @Test
    public void testDeployApplicationClusterWithMultipleJarsSet() {
        Configuration flinkConfig = new Configuration();
        flinkConfig.set(PipelineOptions.JARS, Arrays.asList("local:///path/of/user.jar", "local:///user2.jar"));
        flinkConfig.set(DeploymentOptions.TARGET, (Object)YarnDeploymentTarget.APPLICATION.getName());
        try (YarnClusterDescriptor yarnClusterDescriptor = this.createYarnClusterDescriptor(flinkConfig);){
            CommonTestUtils.assertThrows((String)"Should only have one jar", IllegalArgumentException.class, () -> yarnClusterDescriptor.deployApplicationCluster(this.clusterSpecification, this.appConfig));
        }
    }

    private YarnClusterDescriptor createYarnClusterDescriptor() {
        return this.createYarnClusterDescriptor(new Configuration());
    }

    private YarnClusterDescriptor createYarnClusterDescriptor(Configuration configuration) {
        YarnTestUtils.configureLogFile(configuration, this.temporaryFolder.getRoot().getAbsolutePath());
        return YarnClusterDescriptorBuilder.newBuilder(yarnClient, true).setFlinkConfiguration(configuration).setYarnConfiguration(yarnConfiguration).setYarnClusterInformationRetriever(() -> 16).build();
    }
}

