/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.PerJobMiniClusterFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Test;

public class PerJobMiniClusterFactoryTest
extends TestLogger {
    private MiniCluster miniCluster;

    @After
    public void teardown() throws Exception {
        if (this.miniCluster != null) {
            this.miniCluster.close();
        }
    }

    @Test
    public void testJobExecution() throws Exception {
        PerJobMiniClusterFactory perJobMiniClusterFactory = this.initializeMiniCluster();
        JobClient jobClient = (JobClient)perJobMiniClusterFactory.submitJob(PerJobMiniClusterFactoryTest.getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
        JobExecutionResult jobExecutionResult = (JobExecutionResult)jobClient.getJobExecutionResult().get();
        MatcherAssert.assertThat((Object)jobExecutionResult, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.notNullValue()));
        Map actual = (Map)jobClient.getAccumulators().get();
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.notNullValue()));
        this.assertThatMiniClusterIsShutdown();
    }

    @Test
    public void testJobClient() throws Exception {
        PerJobMiniClusterFactory perJobMiniClusterFactory = this.initializeMiniCluster();
        JobGraph cancellableJobGraph = PerJobMiniClusterFactoryTest.getCancellableJobGraph();
        JobClient jobClient = (JobClient)perJobMiniClusterFactory.submitJob(cancellableJobGraph, ClassLoader.getSystemClassLoader()).get();
        MatcherAssert.assertThat((Object)jobClient.getJobID(), (Matcher)CoreMatchers.is((Object)cancellableJobGraph.getJobID()));
        MatcherAssert.assertThat(jobClient.getJobStatus().get(), (Matcher)CoreMatchers.is((Object)JobStatus.RUNNING));
        jobClient.cancel().get();
        CommonTestUtils.assertThrows((String)"Job was cancelled.", ExecutionException.class, () -> (JobExecutionResult)jobClient.getJobExecutionResult().get());
        this.assertThatMiniClusterIsShutdown();
    }

    @Test
    public void testJobClientSavepoint() throws Exception {
        PerJobMiniClusterFactory perJobMiniClusterFactory = this.initializeMiniCluster();
        JobClient jobClient = (JobClient)perJobMiniClusterFactory.submitJob(PerJobMiniClusterFactoryTest.getCancellableJobGraph(), ClassLoader.getSystemClassLoader()).get();
        CommonTestUtils.assertThrows((String)"is not a streaming job.", ExecutionException.class, () -> (String)jobClient.triggerSavepoint(null).get());
        CommonTestUtils.assertThrows((String)"is not a streaming job.", ExecutionException.class, () -> (String)jobClient.stopWithSavepoint(true, null).get());
    }

    @Test
    public void testMultipleExecutions() throws Exception {
        PerJobMiniClusterFactory perJobMiniClusterFactory = this.initializeMiniCluster();
        JobClient jobClient = (JobClient)perJobMiniClusterFactory.submitJob(PerJobMiniClusterFactoryTest.getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
        jobClient.getJobExecutionResult().get();
        this.assertThatMiniClusterIsShutdown();
        jobClient = (JobClient)perJobMiniClusterFactory.submitJob(PerJobMiniClusterFactoryTest.getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
        jobClient.getJobExecutionResult().get();
        this.assertThatMiniClusterIsShutdown();
    }

    @Test
    public void testJobClientInteractionAfterShutdown() throws Exception {
        PerJobMiniClusterFactory perJobMiniClusterFactory = this.initializeMiniCluster();
        JobClient jobClient = (JobClient)perJobMiniClusterFactory.submitJob(PerJobMiniClusterFactoryTest.getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
        jobClient.getJobExecutionResult().get();
        this.assertThatMiniClusterIsShutdown();
        CommonTestUtils.assertThrows((String)"MiniCluster is not yet running or has already been shut down.", IllegalStateException.class, () -> ((JobClient)jobClient).cancel());
    }

    private PerJobMiniClusterFactory initializeMiniCluster() {
        return PerJobMiniClusterFactory.createWithFactory((Configuration)new Configuration(), config -> {
            this.miniCluster = new MiniCluster(config);
            return this.miniCluster;
        });
    }

    private void assertThatMiniClusterIsShutdown() {
        MatcherAssert.assertThat((Object)this.miniCluster.isRunning(), (Matcher)CoreMatchers.is((Object)false));
    }

    private static JobGraph getNoopJobGraph() {
        return JobGraphTestUtils.singleNoOpJobGraph();
    }

    private static JobGraph getCancellableJobGraph() {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(WaitingCancelableInvokable.class);
        jobVertex.setParallelism(1);
        return JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{jobVertex});
    }
}

