package org.apache.flink.runtime.checkpoint.savepoint;

import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.class */
public class SavepointCoordinator extends CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SavepointCoordinator.class);
    private final SavepointStore savepointStore;
    private final Map<Long, Promise<String>> savepointPromises;
    private volatile String savepointRestorePath;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator$IgnoreCheckpointsStore.class */
    private static class IgnoreCheckpointsStore implements CompletedCheckpointStore {
        private static final CompletedCheckpointStore INSTANCE = new IgnoreCheckpointsStore();

        private IgnoreCheckpointsStore() {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void recover() throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void addCheckpoint(CompletedCheckpoint completedCheckpoint) throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            return null;
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void shutdown() throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void suspend() throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return Collections.emptyList();
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public int getNumberOfRetainedCheckpoints() {
            return 0;
        }
    }

    public SavepointCoordinator(JobID jobID, long j, long j2, int i, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, ClassLoader classLoader, CheckpointIDCounter checkpointIDCounter, SavepointStore savepointStore, CheckpointStatsTracker checkpointStatsTracker) throws Exception {
        super(jobID, j, j2, 0L, Integer.MAX_VALUE, i, executionVertexArr, executionVertexArr2, executionVertexArr3, classLoader, checkpointIDCounter, IgnoreCheckpointsStore.INSTANCE, RecoveryMode.STANDALONE, checkpointStatsTracker);
        this.savepointStore = (SavepointStore) Preconditions.checkNotNull(savepointStore);
        this.savepointPromises = new ConcurrentHashMap();
    }

    public String getSavepointRestorePath() {
        return this.savepointRestorePath;
    }

    public Future<String> triggerSavepoint(long j) throws Exception {
        long andIncrementCheckpointId;
        Promise.DefaultPromise defaultPromise = new Promise.DefaultPromise();
        try {
            andIncrementCheckpointId = getAndIncrementCheckpointId();
        } catch (Throwable th) {
            defaultPromise.failure(new Exception("Failed to trigger savepoint", th));
        }
        if (andIncrementCheckpointId == -1) {
            throw new IllegalStateException("Failed to get checkpoint Id");
        }
        LOG.info("Triggering savepoint with ID " + andIncrementCheckpointId);
        if (this.savepointPromises.put(Long.valueOf(andIncrementCheckpointId), defaultPromise) != null) {
            throw new IllegalStateException("Duplicate checkpoint ID");
        }
        boolean z = false;
        try {
            z = triggerCheckpoint(j, andIncrementCheckpointId);
            if (!z) {
                this.savepointPromises.remove(Long.valueOf(andIncrementCheckpointId));
                defaultPromise.failure(new Exception("Failed to trigger savepoint"));
            }
            return defaultPromise.future();
        } catch (Throwable th2) {
            if (!z) {
                this.savepointPromises.remove(Long.valueOf(andIncrementCheckpointId));
                defaultPromise.failure(new Exception("Failed to trigger savepoint"));
            }
            throw th2;
        }
    }

    public void restoreSavepoint(Map<JobVertexID, ExecutionJobVertex> map, String str) throws Exception {
        Preconditions.checkNotNull(str, "Savepoint path");
        synchronized (this.lock) {
            if (isShutdown()) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            LOG.info("Rolling back to savepoint '{}'.", str);
            Savepoint loadSavepoint = this.savepointStore.loadSavepoint(str);
            for (TaskState taskState : loadSavepoint.getTaskStates()) {
                ExecutionJobVertex executionJobVertex = map.get(taskState.getJobVertexID());
                if (executionJobVertex == null) {
                    throw new IllegalStateException(String.format("Failed to rollback to savepoint %s. Cannot map old state for task %s to the new program. This indicates that the program has been changed in a non-compatible way  after the savepoint.", str, taskState.getJobVertexID()));
                }
                if (executionJobVertex.getParallelism() != taskState.getParallelism()) {
                    throw new IllegalStateException(String.format("Failed to rollback to savepoint %s. Parallelism mismatch between savepoint state and new program. Cannot map operator %s with parallelism %d to new program with parallelism %d. This indicates that the program has been changed in a non-compatible way after the savepoint.", loadSavepoint, taskState.getJobVertexID(), Integer.valueOf(taskState.getParallelism()), Integer.valueOf(executionJobVertex.getParallelism())));
                }
                List<Set<Integer>> createKeyGroupPartitions = createKeyGroupPartitions(this.numberKeyGroups, executionJobVertex.getParallelism());
                for (int i = 0; i < executionJobVertex.getTaskVertices().length; i++) {
                    SubtaskState state = taskState.getState(i);
                    SerializedValue<StateHandle<?>> serializedValue = null;
                    if (state != null) {
                        serializedValue = state.getState();
                    }
                    executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().setInitialState(serializedValue, taskState.getUnwrappedKvStates(createKeyGroupPartitions.get(i)));
                }
            }
            long checkpointId = loadSavepoint.getCheckpointId() + 1;
            this.checkpointIdCounter.start();
            this.checkpointIdCounter.setCount(checkpointId);
            LOG.info("Reset the checkpoint ID to {}", Long.valueOf(checkpointId));
            if (this.savepointRestorePath == null) {
                this.savepointRestorePath = str;
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    protected void onShutdown() {
        Iterator<scala.concurrent.Promise<String>> it = this.savepointPromises.values().iterator();
        while (it.hasNext()) {
            it.next().failure(new Exception("Checkpoint coordinator shutdown"));
        }
        this.savepointPromises.clear();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    protected void onCancelCheckpoint(long j) {
        LOG.info("Cancelling savepoint with checkpoint ID " + j);
        scala.concurrent.Promise<String> remove = this.savepointPromises.remove(Long.valueOf(j));
        if (remove != null) {
            remove.failure(new Exception("Savepoint expired before completing"));
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint completedCheckpoint) {
        scala.concurrent.Promise<String> remove = this.savepointPromises.remove(Long.valueOf(completedCheckpoint.getCheckpointID()));
        if (remove == null) {
            LOG.warn("Pending savepoint with ID " + completedCheckpoint.getCheckpointID() + "  has been removed before receiving acknowledgment.");
            return;
        }
        if (remove.isCompleted()) {
            throw new IllegalStateException("Savepoint promise completed");
        }
        try {
            remove.success(this.savepointStore.storeSavepoint(new SavepointV0(completedCheckpoint.getCheckpointID(), completedCheckpoint.getTaskStates().values())));
        } catch (Exception e) {
            LOG.warn("Failed to store savepoint.", (Throwable) e);
            remove.failure(e);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID uuid) {
        ActorGateway jobStatusListener;
        synchronized (this.lock) {
            if (isShutdown()) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (getJobStatusListener() == null) {
                setJobStatusListener(new AkkaActorGateway(actorSystem.actorOf(Props.create((Class<?>) SavepointCoordinatorDeActivator.class, this, uuid)), uuid));
            }
            jobStatusListener = getJobStatusListener();
        }
        return jobStatusListener;
    }
}
