/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WritableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class ClientTest
extends TestLogger {
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().build());
    private Plan plan;
    private Configuration config;
    private static final String TEST_EXECUTOR_NAME = "test_executor";
    private static final String ACCUMULATOR_NAME = "test_accumulator";
    private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";

    @Before
    public void setUp() throws Exception {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.generateSequence(1L, 1000L).output((OutputFormat)new DiscardingOutputFormat());
        this.plan = env.createProgramPlan();
        int freePort = NetUtils.getAvailablePort();
        this.config = new Configuration();
        this.config.setString(JobManagerOptions.ADDRESS, "localhost");
        this.config.setInteger(JobManagerOptions.PORT, freePort);
        this.config.set(AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
    }

    private Configuration fromPackagedProgram(PackagedProgram program, int parallelism, boolean detached) {
        Configuration configuration = new Configuration();
        configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, (Object)parallelism);
        configuration.set(DeploymentOptions.ATTACHED, (Object)(!detached ? 1 : 0));
        ConfigUtils.encodeCollectionToConfig((WritableConfig)configuration, (ConfigOption)PipelineOptions.CLASSPATHS, (Collection)program.getClasspaths(), URL::toString);
        ConfigUtils.encodeCollectionToConfig((WritableConfig)configuration, (ConfigOption)PipelineOptions.JARS, (Collection)program.getJobJarAndDependencies(), URL::toString);
        return configuration;
    }

    @Test
    public void testDetachedMode() throws Exception {
        Configuration configuration;
        PackagedProgram prg;
        MiniClusterClient clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
        try {
            prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
            configuration = this.fromPackagedProgram(prg, 1, true);
            ClientUtils.executeProgram((PipelineExecutorServiceLoader)new TestExecutorServiceLoader((ClusterClient<?>)clusterClient, this.plan), (Configuration)configuration, (PackagedProgram)prg, (boolean)false, (boolean)false);
            Assert.fail((String)FAIL_MESSAGE);
        }
        catch (ProgramInvocationException e) {
            Assert.assertEquals((Object)"Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. ", (Object)e.getCause().getMessage());
        }
        try {
            prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
            configuration = this.fromPackagedProgram(prg, 1, true);
            ClientUtils.executeProgram((PipelineExecutorServiceLoader)new TestExecutorServiceLoader((ClusterClient<?>)clusterClient, this.plan), (Configuration)configuration, (PackagedProgram)prg, (boolean)false, (boolean)false);
            Assert.fail((String)FAIL_MESSAGE);
        }
        catch (ProgramInvocationException e) {
            Assert.assertEquals((Object)"Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. ", (Object)e.getCause().getMessage());
        }
        try {
            prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
            configuration = this.fromPackagedProgram(prg, 1, true);
            ClientUtils.executeProgram((PipelineExecutorServiceLoader)new TestExecutorServiceLoader((ClusterClient<?>)clusterClient, this.plan), (Configuration)configuration, (PackagedProgram)prg, (boolean)false, (boolean)false);
            Assert.fail((String)FAIL_MESSAGE);
        }
        catch (ProgramInvocationException e) {
            Assert.assertEquals((Object)"Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. ", (Object)e.getCause().getMessage());
        }
        try {
            prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
            configuration = this.fromPackagedProgram(prg, 1, true);
            ClientUtils.executeProgram((PipelineExecutorServiceLoader)new TestExecutorServiceLoader((ClusterClient<?>)clusterClient, this.plan), (Configuration)configuration, (PackagedProgram)prg, (boolean)false, (boolean)false);
            Assert.fail((String)FAIL_MESSAGE);
        }
        catch (ProgramInvocationException e) {
            Assert.assertEquals((Object)"Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. ", (Object)e.getCause().getMessage());
        }
    }

    @Test(expected=FlinkRuntimeException.class)
    public void testMultiExecuteWithEnforcingSingleJobExecution() throws Throwable {
        block2: {
            try {
                this.launchMultiExecuteJob(true);
            }
            catch (Exception e) {
                if (!(e instanceof ProgramInvocationException)) break block2;
                throw e.getCause();
            }
        }
        Assert.fail((String)"Test should have failed due to multiple execute() calls.");
    }

    @Test
    public void testMultiExecuteWithoutEnforcingSingleJobExecution() throws ProgramInvocationException {
        this.launchMultiExecuteJob(false);
    }

    private void launchMultiExecuteJob(boolean enforceSingleJobExecution) throws ProgramInvocationException {
        try (MiniClusterClient clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());){
            PackagedProgram program = PackagedProgram.newBuilder().setEntryPointClassName(TestMultiExecute.class.getName()).build();
            Configuration configuration = this.fromPackagedProgram(program, 1, false);
            ClientUtils.executeProgram((PipelineExecutorServiceLoader)new TestExecutorServiceLoader((ClusterClient<?>)clusterClient, this.plan), (Configuration)configuration, (PackagedProgram)program, (boolean)enforceSingleJobExecution, (boolean)false);
        }
    }

    @Test
    public void shouldSubmitToJobClient() throws Exception {
        MiniClusterClient clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
        JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph((Pipeline)this.plan, (Configuration)new Configuration(), (int)1);
        jobGraph.addJars(Collections.emptyList());
        jobGraph.setClasspaths(Collections.emptyList());
        Assert.assertNotNull(clusterClient.submitJob(jobGraph).get());
    }

    @Test
    public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
        PackagedProgram packagedProgramMock = (PackagedProgram)Mockito.mock(PackagedProgram.class);
        Mockito.when((Object)packagedProgramMock.getUserCodeClassLoader()).thenReturn((Object)packagedProgramMock.getClass().getClassLoader());
        ((PackagedProgram)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                ExecutionEnvironment.createLocalEnvironment();
                return null;
            }
        }).when((Object)packagedProgramMock)).invokeInteractiveModeForExecution();
        try {
            MiniClusterClient client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
            Configuration configuration = this.fromPackagedProgram(packagedProgramMock, 1, true);
            ClientUtils.executeProgram((PipelineExecutorServiceLoader)new TestExecutorServiceLoader((ClusterClient<?>)client, this.plan), (Configuration)configuration, (PackagedProgram)packagedProgramMock, (boolean)false, (boolean)false);
            Assert.fail((String)"Creating the local execution environment should not be possible");
        }
        catch (InvalidProgramException invalidProgramException) {
            // empty catch block
        }
    }

    @Test
    public void testGetExecutionPlan() throws ProgramInvocationException {
        PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestOptimizerPlan.class.getName()).setArguments(new String[]{"/dev/random", "/tmp"}).build();
        Optimizer optimizer = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), this.config);
        Plan plan = (Plan)PackagedProgramUtils.getPipelineFromProgram((PackagedProgram)prg, (Configuration)new Configuration(), (int)1, (boolean)true);
        OptimizedPlan op = optimizer.compile(plan);
        Assert.assertNotNull((Object)op);
        PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
        Assert.assertNotNull((Object)dumper.getOptimizerPlanAsJSON(op));
        PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
        dumper2.setEncodeForHTML(true);
        String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
        Assert.assertEquals((long)-1L, (long)htmlEscaped.indexOf(92));
    }

    private static final class TestExecutorServiceLoader
    implements PipelineExecutorServiceLoader {
        private final ClusterClient<?> clusterClient;
        private final Plan plan;

        TestExecutorServiceLoader(ClusterClient<?> clusterClient, Plan plan) {
            this.clusterClient = (ClusterClient)Preconditions.checkNotNull(clusterClient);
            this.plan = (Plan)Preconditions.checkNotNull((Object)plan);
        }

        public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
            return new PipelineExecutorFactory(){

                public String getName() {
                    return "my-name";
                }

                public boolean isCompatibleWith(@Nonnull Configuration configuration) {
                    return ClientTest.TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
                }

                public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
                    return (pipeline, config, classLoader) -> {
                        int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
                        JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph((Pipeline)plan, (Configuration)config, (int)parallelism);
                        ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration((Configuration)config);
                        jobGraph.addJars(accessor.getJars());
                        jobGraph.setClasspaths(accessor.getClasspaths());
                        JobID jobID = (JobID)clusterClient.submitJob(jobGraph).get();
                        return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter(() -> clusterClient, jobID, classLoader));
                    };
                }
            };
        }

        public Stream<String> getExecutorNames() {
            throw new UnsupportedOperationException("not implemented");
        }
    }

    public static final class TestGetAllAccumulator {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.fromElements((Object[])new Integer[]{1, 2}).output((OutputFormat)new DiscardingOutputFormat());
            env.execute().getAllAccumulatorResults();
        }
    }

    public static final class TestGetAccumulator {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.fromElements((Object[])new Integer[]{1, 2}).output((OutputFormat)new DiscardingOutputFormat());
            env.execute().getAccumulatorResult(ClientTest.ACCUMULATOR_NAME);
        }
    }

    public static final class TestGetJobID {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.fromElements((Object[])new Integer[]{1, 2}).output((OutputFormat)new DiscardingOutputFormat());
            env.execute().getJobID();
        }
    }

    public static final class TestGetRuntime {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.fromElements((Object[])new Integer[]{1, 2}).output((OutputFormat)new DiscardingOutputFormat());
            env.execute().getNetRuntime();
        }
    }

    public static final class TestMultiExecute {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            for (int i = 0; i < 2; ++i) {
                env.fromElements((Object[])new Integer[]{1, 2}).output((OutputFormat)new DiscardingOutputFormat());
                JobClient jc = env.executeAsync();
                jc.getJobExecutionResult();
            }
        }
    }

    public static final class TestEager {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.fromElements((Object[])new Integer[]{1, 2}).collect();
        }
    }

    public static class TestOptimizerPlan
    implements ProgramDescription {
        public static void main(String[] args) throws Exception {
            if (args.length < 2) {
                System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
                return;
            }
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input = env.readCsvFile(args[0]).fieldDelimiter("\t").types(Long.class, Long.class);
            MapOperator result = input.map((MapFunction)new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>(){

                public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
                    return new Tuple2(value.f0, (Object)((Long)value.f1 + 1L));
                }
            });
            result.writeAsCsv(args[1], "\n", "\t");
            env.execute();
        }

        public String getDescription() {
            return "TestOptimizerPlan <input-file-path> <output-file-path>";
        }
    }
}

