package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/checkpointing/TimersSavepointITCase.class */
public class TimersSavepointITCase {
    private static final int PARALLELISM = 4;
    private static final OneShotLatch resultLatch = new OneShotLatch();

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    public static final String SAVEPOINT_FILE_NAME = "legacy-raw-state-heap-timers-rocks-db-1.12";
    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());

    /* loaded from: input_file:org/apache/flink/test/checkpointing/TimersSavepointITCase$ExecutionMode.class */
    public enum ExecutionMode {
        PERFORM_SAVEPOINT,
        VERIFY_SAVEPOINT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/TimersSavepointITCase$Source.class */
    public static class Source implements SourceFunction<Integer>, CheckpointedFunction {
        private volatile boolean running;
        private int emittedCount;
        private ListState<Integer> state;

        private Source() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    if (this.emittedCount == 0) {
                        sourceContext.collect(0);
                        this.emittedCount = 1;
                    } else if (this.emittedCount == 1) {
                        sourceContext.collect(Integer.valueOf(this.emittedCount));
                    } else {
                        int i = this.emittedCount;
                        this.emittedCount = i + 1;
                        sourceContext.collect(Integer.valueOf(i));
                    }
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.add(Integer.valueOf(this.emittedCount));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("emittedCount", IntSerializer.INSTANCE));
            if (functionInitializationContext.isRestored()) {
                this.emittedCount = 2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/TimersSavepointITCase$TimersProcessFunction.class */
    public static class TimersProcessFunction extends KeyedProcessFunction<Integer, Integer, Integer> {
        private TimersProcessFunction() {
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
            if (num.intValue() == 0) {
                context.timerService().registerEventTimeTimer(2L);
            }
        }

        public void onTimer(long j, KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext onTimerContext, Collector<Integer> collector) throws Exception {
            collector.collect(1);
            TimersSavepointITCase.resultLatch.trigger();
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }
    }

    @Test(timeout = 60000)
    public void testSavepointWithTimers() throws Exception {
        ClusterClient<?> clusterClient = this.miniClusterResource.getClusterClient();
        Throwable th = null;
        try {
            if (this.executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
                takeSavepoint("src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12", clusterClient);
            } else {
                if (this.executionMode != ExecutionMode.VERIFY_SAVEPOINT) {
                    throw new IllegalStateException("Unknown ExecutionMode " + this.executionMode);
                }
                verifySavepoint(getResourceFilename(SAVEPOINT_FILE_NAME), clusterClient);
            }
            if (clusterClient != null) {
                if (0 == 0) {
                    clusterClient.close();
                    return;
                }
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (clusterClient != null) {
                if (0 != 0) {
                    try {
                        clusterClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    clusterClient.close();
                }
            }
            throw th3;
        }
    }

    private void verifySavepoint(String str, ClusterClient<?> clusterClient) throws IOException, InterruptedException, ExecutionException {
        JobGraph jobGraph = getJobGraph(EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        clusterClient.submitJob(jobGraph).get();
        resultLatch.await();
    }

    private void takeSavepoint(String str, ClusterClient<?> clusterClient) throws Exception {
        JobGraph jobGraph = getJobGraph(EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
        clusterClient.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(this.miniClusterResource.getMiniCluster(), jobGraph.getJobID(), false);
        FileUtils.moveDirectory(new File(new URI((String) clusterClient.triggerSavepoint(jobGraph.getJobID(), (String) null).get(2L, TimeUnit.SECONDS)).getPath()), new File(str));
    }

    public JobGraph getJobGraph(EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType) throws IOException {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((num, j) -> {
            return num.intValue();
        })).keyBy(num2 -> {
            return num2;
        }).process(new TimersProcessFunction()).addSink(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, TMP_FOLDER.newFolder().toURI().toString());
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, TMP_FOLDER.newFolder().toURI().toString());
        configuration.set(RocksDBOptions.TIMER_SERVICE_FACTORY, priorityQueueStateType);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        return executionEnvironment.getStreamGraph(false).getJobGraph();
    }

    private static String getResourceFilename(String str) {
        URL resource = TimersSavepointITCase.class.getClassLoader().getResource(str);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 840424647:
                if (implMethodName.equals("lambda$getJobGraph$da9be2d1$1")) {
                    z = false;
                    break;
                }
                break;
            case 1199761615:
                if (implMethodName.equals("lambda$getJobGraph$ade061ba$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/TimersSavepointITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;J)J")) {
                    return (num, j) -> {
                        return num.intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/TimersSavepointITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
