package org.apache.flink.runtime.leaderelection;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.utils.JobResultUtils;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.class */
public class LeaderChangeClusterComponentsTest extends TestLogger {
    private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2);
    private static final int SLOTS_PER_TM = 2;
    private static final int NUM_TMS = 2;
    public static final int PARALLELISM = 4;
    private static TestingMiniCluster miniCluster;
    private static EmbeddedHaServicesWithLeadershipControl highAvailabilityServices;
    private static Properties sysProps;
    private JobGraph jobGraph;
    private JobID jobId;

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest$BlockingOperator.class */
    public static class BlockingOperator extends AbstractInvokable {
        static boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (!isBlocking) {
                return;
            }
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        }
    }

    @BeforeClass
    public static void setupClass() throws Exception {
        sysProps = System.getProperties();
        System.setProperty("flink.tests.enable-rm-multi-leader-session", "");
        highAvailabilityServices = new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
        miniCluster = TestingMiniCluster.newBuilder(TestingMiniClusterConfiguration.newBuilder().setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build()).setHighAvailabilityServicesSupplier(() -> {
            return highAvailabilityServices;
        }).build();
        miniCluster.start();
    }

    @Before
    public void setup() throws Exception {
        this.jobGraph = createJobGraph(4);
        this.jobId = this.jobGraph.getJobID();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (miniCluster != null) {
            miniCluster.close();
        }
        System.setProperties(sysProps);
    }

    @Test
    public void testReelectionOfDispatcher() throws Exception {
        miniCluster.submitJob(this.jobGraph).get();
        CompletableFuture requestJobResult = miniCluster.requestJobResult(this.jobId);
        highAvailabilityServices.revokeDispatcherLeadership().get();
        Assert.assertEquals(((JobResult) requestJobResult.get()).getApplicationStatus(), ApplicationStatus.UNKNOWN);
        highAvailabilityServices.grantDispatcherLeadership();
        BlockingOperator.isBlocking = false;
        miniCluster.submitJob(this.jobGraph).get();
        JobResultUtils.assertSuccess((JobResult) miniCluster.requestJobResult(this.jobId).get());
    }

    @Test
    public void testReelectionOfJobMaster() throws Exception {
        miniCluster.submitJob(this.jobGraph).get();
        CompletableFuture requestJobResult = miniCluster.requestJobResult(this.jobId);
        CommonTestUtils.waitUntilJobManagerIsInitialized(() -> {
            return (JobStatus) miniCluster.getJobStatus(this.jobId).get();
        });
        highAvailabilityServices.revokeJobMasterLeadership(this.jobId).get();
        JobResultUtils.assertIncomplete(requestJobResult);
        BlockingOperator.isBlocking = false;
        highAvailabilityServices.grantJobMasterLeadership(this.jobId);
        JobResultUtils.assertSuccess((JobResult) requestJobResult.get());
    }

    @Test
    public void testTaskExecutorsReconnectToClusterWithLeadershipChange() throws Exception {
        Deadline fromNow = Deadline.fromNow(TESTING_TIMEOUT);
        waitUntilTaskExecutorsHaveConnected(2, fromNow);
        highAvailabilityServices.revokeResourceManagerLeadership().get();
        highAvailabilityServices.grantResourceManagerLeadership();
        Assert.assertThat(LeaderRetrievalUtils.retrieveLeaderConnectionInfo(highAvailabilityServices.getResourceManagerLeaderRetriever(), TESTING_TIMEOUT).getLeaderSessionId(), Matchers.is(Matchers.notNullValue()));
        waitUntilTaskExecutorsHaveConnected(2, fromNow);
    }

    private void waitUntilTaskExecutorsHaveConnected(int i, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(((ClusterOverview) miniCluster.requestClusterOverview().get()).getNumTaskManagersConnected() == i);
        }, deadline, 10L);
    }

    private JobGraph createJobGraph(int i) {
        BlockingOperator.isBlocking = true;
        JobVertex jobVertex = new JobVertex("blocking operator");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(BlockingOperator.class);
        return JobGraphTestUtils.streamingJobGraph(jobVertex);
    }
}
