/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.application;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap;
import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.testjar.BlockingJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class ApplicationDispatcherBootstrapITCase
extends TestLogger {
    private static final Duration TIMEOUT = Duration.ofMinutes(10L);

    private static Supplier<DispatcherResourceManagerComponentFactory> createApplicationModeDispatcherResourceManagerComponentFactorySupplier(Configuration configuration, PackagedProgram program) {
        return () -> {
            ApplicationDispatcherLeaderProcessFactoryFactory applicationDispatcherLeaderProcessFactoryFactory = ApplicationDispatcherLeaderProcessFactoryFactory.create((Configuration)new Configuration(configuration), (DispatcherFactory)SessionDispatcherFactory.INSTANCE, (PackagedProgram)program);
            return new DefaultDispatcherResourceManagerComponentFactory((DispatcherRunnerFactory)new DefaultDispatcherRunnerFactory((DispatcherLeaderProcessFactoryFactory)applicationDispatcherLeaderProcessFactoryFactory), (ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance(), (RestEndpointFactory)JobRestEndpointFactory.INSTANCE);
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exception {
        String blockId = UUID.randomUUID().toString();
        Deadline deadline = Deadline.fromNow((Duration)TIMEOUT);
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        configuration.set(DeploymentOptions.TARGET, (Object)"embedded");
        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, (Object)Duration.ofMillis(100L));
        TestingMiniClusterConfiguration clusterConfiguration = TestingMiniClusterConfiguration.newBuilder().setConfiguration(configuration).build();
        EmbeddedHaServicesWithLeadershipControl haServices = new EmbeddedHaServicesWithLeadershipControl((Executor)TestingUtils.defaultExecutor());
        TestingMiniCluster.Builder clusterBuilder = TestingMiniCluster.newBuilder((TestingMiniClusterConfiguration)clusterConfiguration).setHighAvailabilityServicesSupplier(() -> haServices).setDispatcherResourceManagerComponentFactorySupplier(ApplicationDispatcherBootstrapITCase.createApplicationModeDispatcherResourceManagerComponentFactorySupplier((Configuration)clusterConfiguration.getConfiguration(), BlockingJob.getProgram(blockId)));
        try (TestingMiniCluster cluster = clusterBuilder.build();){
            cluster.start();
            ApplicationDispatcherBootstrapITCase.awaitJobStatus((MiniCluster)cluster, ApplicationDispatcherBootstrap.ZERO_JOB_ID, JobStatus.RUNNING, deadline);
            BlockingJob.awaitRunning(blockId);
            CompletableFuture firstJobResult = cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
            haServices.revokeDispatcherLeadership();
            Assertions.assertEquals((Object)ApplicationStatus.UNKNOWN, (Object)((JobResult)firstJobResult.get()).getApplicationStatus());
            haServices.grantDispatcherLeadership();
            ApplicationDispatcherBootstrapITCase.awaitJobStatus((MiniCluster)cluster, ApplicationDispatcherBootstrap.ZERO_JOB_ID, JobStatus.RUNNING, deadline);
            BlockingJob.unblock(blockId);
            CompletableFuture secondJobResult = cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
            Assertions.assertTrue((boolean)((JobResult)secondJobResult.get()).isSuccess());
            Assertions.assertEquals((Object)ApplicationStatus.SUCCEEDED, (Object)((JobResult)secondJobResult.get()).getApplicationStatus());
            ApplicationDispatcherBootstrapITCase.awaitClusterStopped((MiniCluster)cluster, deadline);
        }
        finally {
            BlockingJob.cleanUp(blockId);
        }
    }

    private static void awaitClusterStopped(MiniCluster cluster, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), (Deadline)deadline);
    }

    private static void awaitJobStatus(MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            try {
                return cluster.getJobStatus(jobId).get() == status;
            }
            catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent()) {
                    return false;
                }
                throw e;
            }
        }, (Deadline)deadline);
    }
}

