package org.apache.flink.test.checkpointing;

import java.io.File;
import java.time.Duration;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogRecoverySwitchStateBackendITCase.class */
public class ChangelogRecoverySwitchStateBackendITCase extends ChangelogRecoverySwitchEnvTestBase {
    public ChangelogRecoverySwitchStateBackendITCase(AbstractStateBackend abstractStateBackend) {
        super(abstractStateBackend);
    }

    @Override // org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase
    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20b"));
        FsStateChangelogStorageFactory.configure(configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMinutes(1L), 10);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @Test
    public void testSwitchFromEnablingToDisabling() throws Exception {
        testSwitchEnv(getEnv(true), getEnv(false));
    }

    @Test
    public void testSwitchFromEnablingToDisablingWithRescalingOut() throws Exception {
        testSwitchEnv(getEnv(true, 2), getEnv(false, 4));
    }

    @Test
    public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception {
        testSwitchEnv(getEnv(true, 4), getEnv(false, 2));
    }

    @Test
    public void testSwitchFromDisablingToEnablingInClaimMode() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        MiniCluster miniCluster = this.cluster.getMiniCluster();
        StreamExecutionEnvironment env = getEnv((StateBackend) this.delegatedStateBackend, newFolder, false, 100L, -1L);
        SharedReference<MiniCluster> add = this.sharedObjects.add(miniCluster);
        JobGraph buildJobGraph = buildJobGraph(env, 2000, 2500, add);
        try {
            miniCluster.submitJob(buildJobGraph).get();
            miniCluster.requestJobResult(buildJobGraph.getJobID()).get();
        } catch (Exception e) {
            Preconditions.checkState(ExceptionUtils.findThrowable(e, ChangelogRecoveryITCaseBase.ArtificialFailure.class).isPresent());
        }
        String str = (String) CommonTestUtils.getLatestCompletedCheckpointPath(buildJobGraph.getJobID(), miniCluster).get();
        JobGraph buildJobGraph2 = buildJobGraph(getEnv((StateBackend) this.delegatedStateBackend, TEMPORARY_FOLDER.newFolder(), true, 100L, -1L), 3333, 5000, add);
        setSavepointRestoreSettings(buildJobGraph2, str);
        try {
            miniCluster.submitJob(buildJobGraph2).get();
            miniCluster.requestJobResult(buildJobGraph2.getJobID()).get();
        } catch (Exception e2) {
            Preconditions.checkState(ExceptionUtils.findThrowable(e2, ChangelogRecoveryITCaseBase.ArtificialFailure.class).isPresent());
        }
        String str2 = (String) CommonTestUtils.getLatestCompletedCheckpointPath(buildJobGraph2.getJobID(), miniCluster).get();
        JobGraph buildJobGraph3 = buildJobGraph(getEnv((StateBackend) this.delegatedStateBackend, TEMPORARY_FOLDER.newFolder(), true, 100L, 1000L), 10000, 6666, add);
        setSavepointRestoreSettings(buildJobGraph3, str2);
        miniCluster.submitJob(buildJobGraph3).get();
        miniCluster.requestJobResult(buildJobGraph3.getJobID()).get();
    }

    private StreamExecutionEnvironment getEnv(boolean z) {
        return getEnv(z, 4);
    }

    private StreamExecutionEnvironment getEnv(boolean z, int i) {
        StreamExecutionEnvironment env = getEnv((StateBackend) this.delegatedStateBackend, 100L, 0, 500L, 0);
        env.enableChangelogStateBackend(z);
        env.setParallelism(i);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        return env;
    }

    private StreamExecutionEnvironment getEnv(StateBackend stateBackend, File file, boolean z, long j, long j2) {
        StreamExecutionEnvironment env = getEnv(stateBackend, file, j, 0, j2, 0);
        env.enableChangelogStateBackend(z);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        return env;
    }

    private void setSavepointRestoreSettings(JobGraph jobGraph, String str) {
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false, RestoreMode.CLAIM));
    }
}
