/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.application;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap;
import org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.flink.client.deployment.application.UnsuccessfulExecutionException;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.testjar.MultiExecuteJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class ApplicationDispatcherBootstrapTest
extends TestLogger {
    private static final int TIMEOUT_SECONDS = 10;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
    private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(this.executor);

    @After
    public void cleanup() {
        ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.executor});
    }

    @Test
    public void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable {
        TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, 0);
        ApplicationDispatcherBootstrapTest.assertException(applicationFuture, ApplicationExecutionException.class);
    }

    @Test
    public void testOnlyOneJobIsAllowedWithHa() throws Throwable {
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture<Void> applicationFuture = this.runApplication(configurationUnderTest, 2);
        ApplicationDispatcherBootstrapTest.assertException(applicationFuture, FlinkRuntimeException.class);
    }

    @Test
    public void testOnlyOneJobAllowedWithStaticJobId() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        CompletableFuture<Void> applicationFuture = this.runApplication(configurationUnderTest, 2);
        ApplicationDispatcherBootstrapTest.assertException(applicationFuture, FlinkRuntimeException.class);
    }

    @Test
    public void testOnlyOneJobAllowedWithStaticJobIdAndHa() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture<Void> applicationFuture = this.runApplication(configurationUnderTest, 2);
        ApplicationDispatcherBootstrapTest.assertException(applicationFuture, FlinkRuntimeException.class);
    }

    @Test
    public void testJobIdDefaultsToZeroWithHa() throws Throwable {
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture submittedJobId = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            submittedJobId.complete(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        applicationFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(submittedJobId.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)new JobID(0L, 0L)));
    }

    @Test
    public void testStaticJobId() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        CompletableFuture submittedJobId = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            submittedJobId.complete(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        applicationFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(submittedJobId.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)new JobID(0L, 2L)));
    }

    @Test
    public void testStaticJobIdWithHa() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture submittedJobId = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            submittedJobId.complete(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        applicationFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(submittedJobId.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)new JobID(0L, 2L)));
    }

    @Test
    public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
        ConcurrentLinkedDeque submittedJobIds = new ConcurrentLinkedDeque();
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            submittedJobIds.add(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobId -> {
            if (jobId.equals(submittedJobIds.peek())) {
                return CompletableFuture.completedFuture(JobStatus.FAILED);
            }
            return CompletableFuture.completedFuture(JobStatus.RUNNING);
        })).setRequestJobResultFunction(jobId -> {
            if (jobId.equals(submittedJobIds.peek())) {
                return CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createFailedJobResult(jobId));
            }
            return new CompletableFuture();
        });
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, 2);
        UnsuccessfulExecutionException exception = ApplicationDispatcherBootstrapTest.assertException(applicationFuture, UnsuccessfulExecutionException.class);
        Assert.assertEquals((Object)exception.getStatus(), (Object)ApplicationStatus.FAILED);
    }

    @Test
    public void testApplicationSucceedsWhenAllJobsSucceed() throws Exception {
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, 3);
        applicationFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
        CompletableFuture clusterShutdownStatus = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.CANCELED))).setClusterShutdownFunction(status -> {
            clusterShutdownStatus.complete(status);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createCancelledJobResult(jobId)));
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        shutdownFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(clusterShutdownStatus.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.CANCELED));
    }

    @Test
    public void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception {
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        ScheduledFuture applicationExecutionFuture = bootstrap.getApplicationExecutionFuture();
        shutdownFuture.get(10L, TimeUnit.SECONDS);
        applicationExecutionFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
        AtomicBoolean shutdownCalled = new AtomicBoolean(false);
        TestingDispatcherGateway.Builder dispatcherBuilder = ((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING))).setClusterShutdownFunction(status -> {
            shutdownCalled.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture errorHandlerFuture = new CompletableFuture();
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor, errorHandlerFuture::completeExceptionally);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        ScheduledFuture applicationExecutionFuture = bootstrap.getApplicationExecutionFuture();
        bootstrap.stop();
        Assert.assertFalse((boolean)errorHandlerFuture.isDone());
        shutdownFuture.get();
        Assert.assertFalse((boolean)shutdownCalled.get());
        MatcherAssert.assertThat((Object)applicationExecutionFuture.isCancelled(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)applicationExecutionFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testErrorHandlerIsCalledWhenSubmissionThrowsAnException() throws Exception {
        AtomicBoolean shutdownCalled = new AtomicBoolean(false);
        TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            throw new FlinkRuntimeException("Nope!");
        }).setClusterShutdownFunction(status -> {
            shutdownCalled.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture errorHandlerFuture = new CompletableFuture();
        TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build();
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherGateway, this.scheduledExecutor, errorHandlerFuture::completeExceptionally);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        ApplicationDispatcherBootstrapTest.assertException(shutdownFuture, FlinkRuntimeException.class);
        ApplicationDispatcherBootstrapTest.assertException(errorHandlerFuture, FlinkRuntimeException.class);
        Assert.assertFalse((boolean)shutdownCalled.get());
    }

    @Test
    public void testErrorHandlerIsCalledWhenShutdownCompletesExceptionally() throws Exception {
        this.testErrorHandlerIsCalled(() -> FutureUtils.completedExceptionally((Throwable)new FlinkRuntimeException("Test exception.")));
    }

    @Test
    public void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws Exception {
        this.testErrorHandlerIsCalled(() -> {
            throw new FlinkRuntimeException("Test exception.");
        });
    }

    private void testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> shutdownFunction) throws Exception {
        TestingDispatcherGateway.Builder dispatcherBuilder = ((TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)))).setClusterShutdownFunction(status -> (CompletableFuture)shutdownFunction.get());
        CompletableFuture errorHandlerFuture = new CompletableFuture();
        TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build();
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherGateway, this.scheduledExecutor, errorHandlerFuture::completeExceptionally);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        ApplicationDispatcherBootstrapTest.assertException(errorHandlerFuture, FlinkRuntimeException.class);
        ApplicationDispatcherBootstrapTest.assertException(shutdownFuture, FlinkRuntimeException.class);
    }

    @Test
    public void testClusterIsShutdownInAttachedModeWhenJobCancelled() throws Exception {
        CompletableFuture clusterShutdown = new CompletableFuture();
        TestingDispatcherGateway dispatcherGateway = ((TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.CANCELED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createCancelledJobResult(jobId)))).setClusterShutdownFunction(status -> {
            clusterShutdown.complete(status);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        PackagedProgram program = this.getProgram(2);
        Configuration configuration = this.getConfiguration();
        configuration.set(DeploymentOptions.ATTACHED, (Object)true);
        ApplicationDispatcherBootstrap bootstrap = new ApplicationDispatcherBootstrap(program, Collections.emptyList(), configuration, (DispatcherGateway)dispatcherGateway, this.scheduledExecutor, e -> {});
        CompletableFuture applicationFuture = bootstrap.getApplicationCompletionFuture();
        ApplicationDispatcherBootstrapTest.assertException(applicationFuture, UnsuccessfulExecutionException.class);
        Assert.assertEquals(clusterShutdown.get(), (Object)ApplicationStatus.CANCELED);
    }

    @Test
    public void testClusterShutdownWhenApplicationSucceeds() throws Exception {
        CompletableFuture externalShutdownFuture = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = ((TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)))).setClusterShutdownFunction(status -> {
            externalShutdownFuture.complete(status);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        shutdownFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(externalShutdownFuture.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.SUCCEEDED));
    }

    @Test
    public void testClusterShutdownWhenApplicationFails() throws Exception {
        CompletableFuture externalShutdownFuture = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = ((TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FAILED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createFailedJobResult(jobId)))).setClusterShutdownFunction(status -> {
            externalShutdownFuture.complete(status);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        shutdownFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(externalShutdownFuture.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.FAILED));
    }

    @Test
    public void testClusterShutdownWhenApplicationGetsCancelled() throws Exception {
        CompletableFuture externalShutdownFuture = new CompletableFuture();
        TestingDispatcherGateway.Builder dispatcherBuilder = ((TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.CANCELED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createCancelledJobResult(jobId)))).setClusterShutdownFunction(status -> {
            externalShutdownFuture.complete(status);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor);
        CompletableFuture shutdownFuture = bootstrap.getClusterShutdownFuture();
        shutdownFuture.get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(externalShutdownFuture.get(10L, TimeUnit.SECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.CANCELED));
    }

    @Test
    public void testErrorHandlerIsCalledWhenApplicationStatusIsUnknown() throws Exception {
        AtomicBoolean shutdownCalled = new AtomicBoolean(false);
        TestingDispatcherGateway.Builder dispatcherBuilder = ((TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FAILED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createUnknownJobResult(jobId)))).setClusterShutdownFunction(status -> {
            shutdownCalled.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build();
        CompletableFuture errorHandlerFuture = new CompletableFuture();
        ApplicationDispatcherBootstrap bootstrap = this.createApplicationDispatcherBootstrap(3, (DispatcherGateway)dispatcherGateway, this.scheduledExecutor, errorHandlerFuture::completeExceptionally);
        ApplicationDispatcherBootstrapTest.assertException(bootstrap.getClusterShutdownFuture(), UnsuccessfulExecutionException.class);
        ApplicationDispatcherBootstrapTest.assertException(bootstrap.getClusterShutdownFuture(), UnsuccessfulExecutionException.class);
        Assert.assertFalse((boolean)shutdownCalled.get());
    }

    @Test
    public void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)DuplicateJobSubmissionException.ofGloballyTerminated((JobID)testJobID))).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        applicationFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)DuplicateJobSubmissionException.ofGloballyTerminated((JobID)testJobID))).setRequestJobStatusFunction(jobId -> FutureUtils.completedExceptionally((Throwable)new FlinkJobNotFoundException(jobId)))).setRequestJobResultFunction(jobId -> FutureUtils.completedExceptionally((Throwable)new FlinkJobNotFoundException(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        applicationFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResultAttached() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)DuplicateJobSubmissionException.ofGloballyTerminated((JobID)testJobID))).setRequestJobStatusFunction(jobId -> FutureUtils.completedExceptionally((Throwable)new FlinkJobNotFoundException(jobId)))).setRequestJobResultFunction(jobId -> FutureUtils.completedExceptionally((Throwable)new FlinkJobNotFoundException(jobId)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        applicationFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
        JobID testJobID = new JobID(0L, 2L);
        Configuration configurationUnderTest = this.getConfiguration();
        configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)testJobID.toHexString());
        configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)DuplicateJobSubmissionException.of((JobID)testJobID)));
        CompletableFuture<Void> applicationFuture = this.runApplication(dispatcherBuilder, configurationUnderTest, 1);
        ExecutionException executionException = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)applicationFuture.get(10L, TimeUnit.SECONDS);
        });
        Optional maybeDuplicate = ExceptionUtils.findThrowable((Throwable)executionException, DuplicateJobSubmissionException.class);
        Assert.assertTrue((boolean)maybeDuplicate.isPresent());
        Assert.assertFalse((boolean)((DuplicateJobSubmissionException)maybeDuplicate.get()).isGloballyTerminated());
    }

    private CompletableFuture<Void> runApplication(TestingDispatcherGateway.Builder dispatcherBuilder, int noOfJobs) throws FlinkException {
        return this.runApplication(dispatcherBuilder, this.getConfiguration(), noOfJobs);
    }

    private CompletableFuture<Void> runApplication(Configuration configuration, int noOfJobs) throws Throwable {
        TestingDispatcherGateway.Builder dispatcherBuilder = (TestingDispatcherGateway.Builder)((TestingDispatcherGateway.Builder)new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))).setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(ApplicationDispatcherBootstrapTest.createSuccessfulJobResult(jobId)));
        return this.runApplication(dispatcherBuilder, configuration, noOfJobs);
    }

    private CompletableFuture<Void> runApplication(TestingDispatcherGateway.Builder dispatcherBuilder, Configuration configuration, int noOfJobs) throws FlinkException {
        PackagedProgram program = this.getProgram(noOfJobs);
        ApplicationDispatcherBootstrap bootstrap = new ApplicationDispatcherBootstrap(program, Collections.emptyList(), configuration, (DispatcherGateway)dispatcherBuilder.build(), this.scheduledExecutor, exception -> {});
        return bootstrap.getApplicationCompletionFuture();
    }

    private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs, DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) throws FlinkException {
        return this.createApplicationDispatcherBootstrap(noOfJobs, dispatcherGateway, scheduledExecutor, exception -> {});
    }

    private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs, DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor, FatalErrorHandler errorHandler) throws FlinkException {
        PackagedProgram program = this.getProgram(noOfJobs);
        return new ApplicationDispatcherBootstrap(program, Collections.emptyList(), this.getConfiguration(), dispatcherGateway, scheduledExecutor, errorHandler);
    }

    private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
        return MultiExecuteJob.getProgram(noOfJobs, true);
    }

    private static JobResult createUnknownJobResult(JobID jobId) {
        return new JobResult.Builder().jobId(jobId).netRuntime(2L).applicationStatus(ApplicationStatus.UNKNOWN).serializedThrowable(new SerializedThrowable((Throwable)new JobExecutionException(jobId, "unknown bla bla bla"))).build();
    }

    private static JobResult createFailedJobResult(JobID jobId) {
        return new JobResult.Builder().jobId(jobId).netRuntime(2L).applicationStatus(ApplicationStatus.FAILED).serializedThrowable(new SerializedThrowable((Throwable)new JobExecutionException(jobId, "bla bla bla"))).build();
    }

    private static JobResult createSuccessfulJobResult(JobID jobId) {
        return new JobResult.Builder().jobId(jobId).netRuntime(2L).applicationStatus(ApplicationStatus.SUCCEEDED).build();
    }

    private static JobResult createCancelledJobResult(JobID jobId) {
        return new JobResult.Builder().jobId(jobId).netRuntime(2L).serializedThrowable(new SerializedThrowable((Throwable)new JobCancellationException(jobId, "Hello", null))).applicationStatus(ApplicationStatus.CANCELED).build();
    }

    private static <T, E extends Throwable> E assertException(CompletableFuture<T> future, Class<E> exceptionClass) throws Exception {
        try {
            future.get(10L, TimeUnit.SECONDS);
        }
        catch (Throwable e) {
            Optional maybeException = ExceptionUtils.findThrowable((Throwable)e, exceptionClass);
            if (!maybeException.isPresent()) {
                throw e;
            }
            return (E)((Throwable)maybeException.get());
        }
        throw new Exception("Future should have completed exceptionally with " + exceptionClass.getCanonicalName() + ".");
    }

    private Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, (Object)"embedded");
        return configuration;
    }
}

