package org.apache.flink.client.deployment.application;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.testjar.BlockingJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.class */
public class ApplicationDispatcherBootstrapITCase extends TestLogger {
    private static final Duration TIMEOUT = Duration.ofMinutes(10);

    private static Supplier<DispatcherResourceManagerComponentFactory> createApplicationModeDispatcherResourceManagerComponentFactorySupplier(Configuration configuration, PackagedProgram packagedProgram) {
        return () -> {
            return new DefaultDispatcherResourceManagerComponentFactory(new DefaultDispatcherRunnerFactory(ApplicationDispatcherLeaderProcessFactoryFactory.create(new Configuration(configuration), SessionDispatcherFactory.INSTANCE, packagedProgram)), StandaloneResourceManagerFactory.getInstance(), JobRestEndpointFactory.INSTANCE);
        };
    }

    @Test
    public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exception {
        String uuid = UUID.randomUUID().toString();
        Deadline fromNow = Deadline.fromNow(TIMEOUT);
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        configuration.set(DeploymentOptions.TARGET, "embedded");
        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100L));
        TestingMiniClusterConfiguration build = TestingMiniClusterConfiguration.newBuilder().setConfiguration(configuration).build();
        EmbeddedHaServicesWithLeadershipControl embeddedHaServicesWithLeadershipControl = new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
        try {
            TestingMiniCluster build2 = TestingMiniCluster.newBuilder(build).setHighAvailabilityServicesSupplier(() -> {
                return embeddedHaServicesWithLeadershipControl;
            }).setDispatcherResourceManagerComponentFactorySupplier(createApplicationModeDispatcherResourceManagerComponentFactorySupplier(build.getConfiguration(), BlockingJob.getProgram(uuid))).build();
            Throwable th = null;
            try {
                try {
                    build2.start();
                    awaitJobStatus(build2, ApplicationDispatcherBootstrap.ZERO_JOB_ID, JobStatus.RUNNING, fromNow);
                    BlockingJob.awaitRunning(uuid);
                    CompletableFuture requestJobResult = build2.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
                    embeddedHaServicesWithLeadershipControl.revokeDispatcherLeadership();
                    Assertions.assertEquals(ApplicationStatus.UNKNOWN, ((JobResult) requestJobResult.get()).getApplicationStatus());
                    embeddedHaServicesWithLeadershipControl.grantDispatcherLeadership();
                    awaitJobStatus(build2, ApplicationDispatcherBootstrap.ZERO_JOB_ID, JobStatus.RUNNING, fromNow);
                    BlockingJob.unblock(uuid);
                    CompletableFuture requestJobResult2 = build2.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
                    Assertions.assertTrue(((JobResult) requestJobResult2.get()).isSuccess());
                    Assertions.assertEquals(ApplicationStatus.SUCCEEDED, ((JobResult) requestJobResult2.get()).getApplicationStatus());
                    awaitClusterStopped(build2, fromNow);
                    if (build2 != null) {
                        if (0 != 0) {
                            try {
                                build2.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build2.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            BlockingJob.cleanUp(uuid);
        }
    }

    private static void awaitClusterStopped(MiniCluster miniCluster, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(!miniCluster.isRunning());
        }, deadline);
    }

    private static void awaitJobStatus(MiniCluster miniCluster, JobID jobID, JobStatus jobStatus, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            try {
                return Boolean.valueOf(miniCluster.getJobStatus(jobID).get() == jobStatus);
            } catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent()) {
                    return false;
                }
                throw e;
            }
        }, deadline);
    }
}
