package org.apache.flink.test.scheduling;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/scheduling/ReactiveModeITCase.class */
public class ReactiveModeITCase extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
    private static final Configuration configuration = getReactiveModeConfiguration();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(INITIAL_NUMBER_TASK_MANAGERS).setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER).build());

    /* loaded from: input_file:org/apache/flink/test/scheduling/ReactiveModeITCase$DummySource.class */
    private static class DummySource implements SourceFunction<String> {
        private volatile boolean running;

        private DummySource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect("test");
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/ReactiveModeITCase$FailOnParallelExecutionSource.class */
    private static class FailOnParallelExecutionSource extends RichParallelSourceFunction<String> {
        private volatile boolean running;

        private FailOnParallelExecutionSource() {
            this.running = true;
        }

        public void open(Configuration configuration) throws Exception {
            if (getRuntimeContext().getNumberOfParallelSubtasks() > ReactiveModeITCase.INITIAL_NUMBER_TASK_MANAGERS) {
                throw new IllegalStateException("This is not supposed to be executed in parallel, despite extending the right base class.");
            }
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect("test");
                }
                Thread.sleep(100L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static Configuration getReactiveModeConfiguration() {
        Configuration configuration2 = new Configuration();
        configuration2.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
        return configuration2;
    }

    @Test
    public void testScaleLimitByMaxParallelism() throws Exception {
        startAdditionalTaskManager();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new FailOnParallelExecutionSource()).setMaxParallelism(INITIAL_NUMBER_TASK_MANAGERS).addSink(new DiscardingSink());
        waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), executionEnvironment.executeAsync().getJobID(), INITIAL_NUMBER_TASK_MANAGERS);
    }

    @Test
    public void testScaleUpOnAdditionalTaskManager() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new DummySource()).addSink(new DiscardingSink());
        JobClient executeAsync = executionEnvironment.executeAsync();
        waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), executeAsync.getJobID(), NUMBER_SLOTS_PER_TASK_MANAGER);
        this.miniClusterResource.getMiniCluster().startTaskManager();
        waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), executeAsync.getJobID(), 4);
    }

    @Test
    public void testScaleDownOnTaskManagerLoss() throws Exception {
        startAdditionalTaskManager();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(INITIAL_NUMBER_TASK_MANAGERS, 0L));
        executionEnvironment.addSource(new DummySource()).addSink(new DiscardingSink());
        JobClient executeAsync = executionEnvironment.executeAsync();
        waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), executeAsync.getJobID(), 4);
        this.miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
        waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), executeAsync.getJobID(), 4);
    }

    private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
        return ((ClusterOverview) this.miniClusterResource.getMiniCluster().requestClusterOverview().get()).getNumTaskManagersConnected();
    }

    private void startAdditionalTaskManager() throws Exception {
        this.miniClusterResource.getMiniCluster().startTaskManager();
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(getNumberOfConnectedTaskManagers() == NUMBER_SLOTS_PER_TASK_MANAGER);
        }, Deadline.fromNow(Duration.ofMillis(10000L)));
    }

    public static void waitUntilParallelismForVertexReached(RestClusterClient<?> restClusterClient, JobID jobID, int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            for (JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo : ((JobDetailsInfo) restClusterClient.getJobDetails(jobID).get()).getJobVertexInfos()) {
                if (jobVertexDetailsInfo.getName().contains("Source:") && jobVertexDetailsInfo.getParallelism() == i) {
                    return true;
                }
            }
            return false;
        }, Deadline.fromNow(Duration.ofSeconds(10L)));
    }
}
