package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraph.class */
public class ExecutionGraph implements Serializable {
    private static final long serialVersionUID = 42;
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExecutionGraph.class);
    static final String RESTARTING_TIME_METRIC_NAME = "restartingTime";
    private final SerializableObject progressLock;
    private final JobID jobID;
    private final String jobName;
    private final Configuration jobConfiguration;
    private boolean isStoppable;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final List<BlobKey> requiredJarFiles;
    private final List<URL> requiredClasspaths;
    private final List<ActorGateway> jobStatusListenerActors;
    private final List<ActorGateway> executionListenerActors;
    private final long[] stateTimestamps;
    private final FiniteDuration timeout;
    private SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private boolean allowQueuedScheduling;
    private ScheduleMode scheduleMode;
    private boolean isArchived;
    private volatile JobStatus state;
    private volatile Throwable failureCause;
    private volatile int numFinishedJobVertices;
    private Scheduler scheduler;
    private RestartStrategy restartStrategy;
    private ClassLoader userClassLoader;
    private CheckpointCoordinator checkpointCoordinator;
    private transient SavepointCoordinator savepointCoordinator;
    private CheckpointStatsTracker checkpointStatsTracker;
    private ExecutionContext executionContext;
    private String jsonPlan;
    private ExecutionConfigSummary executionConfigSummary;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraph$RestartTimeGauge.class */
    private class RestartTimeGauge implements Gauge<Long> {
        private RestartTimeGauge() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.metrics.Gauge
        /* renamed from: getValue */
        public Long mo2540getValue() {
            long j = ExecutionGraph.this.stateTimestamps[JobStatus.RESTARTING.ordinal()];
            if (j <= 0) {
                return 0L;
            }
            return ExecutionGraph.this.stateTimestamps[JobStatus.RUNNING.ordinal()] >= j ? Long.valueOf(ExecutionGraph.this.stateTimestamps[JobStatus.RUNNING.ordinal()] - j) : ExecutionGraph.this.state.isTerminalState() ? Long.valueOf(ExecutionGraph.this.stateTimestamps[ExecutionGraph.this.state.ordinal()] - j) : Long.valueOf(System.currentTimeMillis() - j);
        }
    }

    ExecutionGraph(ExecutionContext executionContext, JobID jobID, String str, Configuration configuration, SerializedValue<ExecutionConfig> serializedValue, FiniteDuration finiteDuration, RestartStrategy restartStrategy) {
        this(executionContext, jobID, str, configuration, serializedValue, finiteDuration, restartStrategy, new ArrayList(), new ArrayList(), ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup());
    }

    public ExecutionGraph(ExecutionContext executionContext, JobID jobID, String str, Configuration configuration, SerializedValue<ExecutionConfig> serializedValue, FiniteDuration finiteDuration, RestartStrategy restartStrategy, List<BlobKey> list, List<URL> list2, ClassLoader classLoader, MetricGroup metricGroup) {
        this.progressLock = new SerializableObject();
        this.isStoppable = true;
        this.allowQueuedScheduling = false;
        this.scheduleMode = ScheduleMode.FROM_SOURCES;
        this.isArchived = false;
        this.state = JobStatus.CREATED;
        Preconditions.checkNotNull(executionContext);
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(configuration);
        Preconditions.checkNotNull(classLoader);
        this.executionContext = executionContext;
        this.jobID = jobID;
        this.jobName = str;
        this.jobConfiguration = configuration;
        this.userClassLoader = classLoader;
        this.tasks = new ConcurrentHashMap<>();
        this.intermediateResults = new ConcurrentHashMap<>();
        this.verticesInCreationOrder = new ArrayList();
        this.currentExecutions = new ConcurrentHashMap<>();
        this.jobStatusListenerActors = new CopyOnWriteArrayList();
        this.executionListenerActors = new CopyOnWriteArrayList();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.requiredJarFiles = list;
        this.requiredClasspaths = list2;
        this.serializedExecutionConfig = (SerializedValue) Preconditions.checkNotNull(serializedValue);
        this.timeout = finiteDuration;
        this.restartStrategy = restartStrategy;
        metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, (String) new RestartTimeGauge());
        try {
            ExecutionConfig deserializeValue = serializedValue.deserializeValue(classLoader);
            if (deserializeValue != null) {
                this.executionConfigSummary = new ExecutionConfigSummary(deserializeValue);
            }
        } catch (IOException | ClassNotFoundException e) {
            LOG.error("Couldn't create ExecutionConfigSummary for job {} ", this.jobID, e);
        }
    }

    public int getNumberOfExecutionJobVertices() {
        return this.verticesInCreationOrder.size();
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean z) {
        this.allowQueuedScheduling = z;
    }

    public void setScheduleMode(ScheduleMode scheduleMode) {
        this.scheduleMode = scheduleMode;
    }

    public ScheduleMode getScheduleMode() {
        return this.scheduleMode;
    }

    public boolean isArchived() {
        return this.isArchived;
    }

    public void enableSnapshotCheckpointing(long j, long j2, long j3, int i, int i2, List<ExecutionJobVertex> list, List<ExecutionJobVertex> list2, List<ExecutionJobVertex> list3, ActorSystem actorSystem, UUID uuid, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, SavepointStore savepointStore, CheckpointStatsTracker checkpointStatsTracker) throws Exception {
        if (j < 10 || j2 < 10) {
            throw new IllegalArgumentException();
        }
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        ExecutionVertex[] collectExecutionVertices = collectExecutionVertices(list);
        ExecutionVertex[] collectExecutionVertices2 = collectExecutionVertices(list2);
        ExecutionVertex[] collectExecutionVertices3 = collectExecutionVertices(list3);
        disableSnaphotCheckpointing();
        this.checkpointStatsTracker = (CheckpointStatsTracker) Objects.requireNonNull(checkpointStatsTracker, "Checkpoint stats tracker");
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobID, j, j2, j3, i, i2, collectExecutionVertices, collectExecutionVertices2, collectExecutionVertices3, this.userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode, this.checkpointStatsTracker);
        registerJobStatusListener(this.checkpointCoordinator.createActivatorDeactivator(actorSystem, uuid));
        this.savepointCoordinator = new SavepointCoordinator(this.jobID, j, j2, i2, collectExecutionVertices, collectExecutionVertices2, collectExecutionVertices3, this.userClassLoader, checkpointIDCounter, savepointStore, this.checkpointStatsTracker);
        registerJobStatusListener(this.savepointCoordinator.createActivatorDeactivator(actorSystem, uuid));
    }

    public void disableSnaphotCheckpointing() throws Exception {
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        if (this.checkpointCoordinator != null) {
            this.checkpointCoordinator.shutdown();
            this.checkpointCoordinator = null;
            this.checkpointStatsTracker = null;
        }
        if (this.savepointCoordinator != null) {
            this.savepointCoordinator.shutdown();
            this.savepointCoordinator = null;
        }
    }

    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    public SavepointCoordinator getSavepointCoordinator() {
        return this.savepointCoordinator;
    }

    public RestartStrategy getRestartStrategy() {
        return this.restartStrategy;
    }

    public CheckpointStatsTracker getCheckpointStatsTracker() {
        return this.checkpointStatsTracker;
    }

    private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> list) {
        if (list.size() == 1) {
            ExecutionJobVertex executionJobVertex = list.get(0);
            if (executionJobVertex.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            return executionJobVertex.getTaskVertices();
        }
        ArrayList arrayList = new ArrayList();
        for (ExecutionJobVertex executionJobVertex2 : list) {
            if (executionJobVertex2.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            arrayList.addAll(Arrays.asList(executionJobVertex2.getTaskVertices()));
        }
        return (ExecutionVertex[]) arrayList.toArray(new ExecutionVertex[arrayList.size()]);
    }

    public List<BlobKey> getRequiredJarFiles() {
        return this.requiredJarFiles;
    }

    public List<URL> getRequiredClasspaths() {
        return this.requiredClasspaths;
    }

    public void setJsonPlan(String str) {
        this.jsonPlan = str;
    }

    public String getJsonPlan() {
        return this.jsonPlan;
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public String getJobName() {
        return this.jobName;
    }

    public boolean isStoppable() {
        return this.isStoppable;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public ExecutionJobVertex getJobVertex(JobVertexID jobVertexID) {
        return this.tasks.get(jobVertexID);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int size = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1
            @Override // java.lang.Iterable
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.1.1
                    private int pos = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.pos < size;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public ExecutionJobVertex next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException();
                        }
                        List list = ExecutionGraph.this.verticesInCreationOrder;
                        int i = this.pos;
                        this.pos = i + 1;
                        return (ExecutionJobVertex) list.get(i);
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraph.2
            @Override // java.lang.Iterable
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    public long getStatusTimestamp(JobStatus jobStatus) {
        return this.stateTimestamps[jobStatus.ordinal()];
    }

    public ExecutionContext getExecutionContext() {
        return this.executionContext;
    }

    public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() {
        HashMap hashMap = new HashMap();
        for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
            hashMap.put(executionVertex.getCurrentExecutionAttempt().getAttemptId(), executionVertex.getCurrentExecutionAttempt().getFlinkAccumulators());
        }
        return hashMap;
    }

    public Map<String, Accumulator<?, ?>> aggregateUserAccumulators() {
        HashMap hashMap = new HashMap();
        Iterator<ExecutionVertex> it = getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Map<String, Accumulator<?, ?>> userAccumulators = it.next().getCurrentExecutionAttempt().getUserAccumulators();
            if (userAccumulators != null) {
                AccumulatorHelper.mergeInto(hashMap, userAccumulators);
            }
        }
        return hashMap;
    }

    public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
        Map<String, Accumulator<?, ?>> aggregateUserAccumulators = aggregateUserAccumulators();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Accumulator<?, ?>> entry : aggregateUserAccumulators.entrySet()) {
            hashMap.put(entry.getKey(), new SerializedValue(entry.getValue().getLocalValue()));
        }
        return hashMap;
    }

    public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(aggregateUserAccumulators());
    }

    public void attachJobGraph(List<JobVertex> list) throws JobException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d vertices and %d intermediate results.", Integer.valueOf(list.size()), Integer.valueOf(this.tasks.size()), Integer.valueOf(this.intermediateResults.size())));
        }
        long currentTimeMillis = System.currentTimeMillis();
        for (JobVertex jobVertex : list) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(this, jobVertex, 1, this.timeout, currentTimeMillis);
            executionJobVertex.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex putIfAbsent = this.tasks.putIfAbsent(jobVertex.getID(), executionJobVertex);
            if (putIfAbsent != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), executionJobVertex, putIfAbsent));
            }
            for (IntermediateResult intermediateResult : executionJobVertex.getProducedDataSets()) {
                IntermediateResult putIfAbsent2 = this.intermediateResults.putIfAbsent(intermediateResult.getId(), intermediateResult);
                if (putIfAbsent2 != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", intermediateResult.getId(), intermediateResult, putIfAbsent2));
                }
            }
            this.verticesInCreationOrder.add(executionJobVertex);
        }
    }

    public void scheduleForExecution(Scheduler scheduler) throws JobException {
        if (scheduler == null) {
            throw new IllegalArgumentException("Scheduler must not be null.");
        }
        if (this.scheduler != null && this.scheduler != scheduler) {
            throw new IllegalArgumentException("Cannot use different schedulers for the same job");
        }
        if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
        this.scheduler = scheduler;
        switch (this.scheduleMode) {
            case FROM_SOURCES:
                for (ExecutionJobVertex executionJobVertex : this.tasks.values()) {
                    if (executionJobVertex.getJobVertex().isInputVertex()) {
                        executionJobVertex.scheduleAll(scheduler, this.allowQueuedScheduling);
                    }
                }
                return;
            case ALL:
                Iterator<ExecutionJobVertex> it = getVerticesTopologically().iterator();
                while (it.hasNext()) {
                    it.next().scheduleAll(scheduler, this.allowQueuedScheduling);
                }
                return;
            case BACKTRACKING:
                throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
            default:
                throw new JobException("Schedule mode is invalid.");
        }
    }

    public void cancel() {
        while (true) {
            JobStatus jobStatus = this.state;
            if (jobStatus == JobStatus.RUNNING || jobStatus == JobStatus.CREATED) {
                if (transitionState(jobStatus, JobStatus.CANCELLING)) {
                    Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
                    while (it.hasNext()) {
                        it.next().cancel();
                    }
                    return;
                }
            } else if (jobStatus == JobStatus.FAILING) {
                if (transitionState(jobStatus, JobStatus.CANCELLING)) {
                    return;
                }
            } else {
                if (jobStatus != JobStatus.RESTARTING) {
                    return;
                }
                synchronized (this.progressLock) {
                    if (transitionState(jobStatus, JobStatus.CANCELED)) {
                        postRunCleanup();
                        this.progressLock.notifyAll();
                        LOG.info("Canceled during restart.");
                        return;
                    }
                }
            }
        }
    }

    public void stop() throws StoppingException {
        if (!this.isStoppable) {
            throw new StoppingException("This job is not stoppable.");
        }
        for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
            if (executionVertex.getNumberOfInputs() == 0) {
                executionVertex.stop();
            }
        }
    }

    public void suspend(Throwable th) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus.isGloballyTerminalState()) {
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.SUSPENDED, th));
        this.failureCause = th;
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        synchronized (this.progressLock) {
            postRunCleanup();
            this.progressLock.notifyAll();
            LOG.info("Job {} has been suspended.", getJobID());
        }
    }

    public void fail(Throwable th) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (jobStatus == JobStatus.FAILING || jobStatus == JobStatus.SUSPENDED || jobStatus.isGloballyTerminalState()) {
                return;
            }
            if (jobStatus == JobStatus.RESTARTING && transitionState(jobStatus, JobStatus.FAILED, th)) {
                synchronized (this.progressLock) {
                    postRunCleanup();
                    this.progressLock.notifyAll();
                    LOG.info("Job {} failed during restart.", getJobID());
                }
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.FAILING, th));
        this.failureCause = th;
        if (this.verticesInCreationOrder.isEmpty()) {
            transitionState(JobStatus.FAILING, JobStatus.FAILED, th);
            return;
        }
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public void restart() {
        String savepointRestorePath;
        try {
            synchronized (this.progressLock) {
                JobStatus jobStatus = this.state;
                if (jobStatus == JobStatus.CANCELED) {
                    LOG.info("Canceled job during restart. Aborting restart.");
                    return;
                }
                if (jobStatus == JobStatus.FAILED) {
                    LOG.info("Failed job during restart. Aborting restart.");
                    return;
                }
                if (jobStatus == JobStatus.SUSPENDED) {
                    LOG.info("Suspended job during restart. Aborting restart.");
                    return;
                }
                if (jobStatus != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                if (this.scheduler == null) {
                    throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
                }
                this.currentExecutions.clear();
                HashSet hashSet = new HashSet();
                for (ExecutionJobVertex executionJobVertex : this.verticesInCreationOrder) {
                    CoLocationGroup coLocationGroup = executionJobVertex.getCoLocationGroup();
                    if (coLocationGroup != null && !hashSet.contains(coLocationGroup)) {
                        coLocationGroup.resetConstraints();
                        hashSet.add(coLocationGroup);
                    }
                    executionJobVertex.resetForNewExecution();
                }
                for (int i = 0; i < this.stateTimestamps.length; i++) {
                    if (i != JobStatus.RESTARTING.ordinal()) {
                        this.stateTimestamps[i] = 0;
                    }
                }
                this.numFinishedJobVertices = 0;
                transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
                if (this.checkpointCoordinator != null && !this.checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false) && this.savepointCoordinator != null && (savepointRestorePath = this.savepointCoordinator.getSavepointRestorePath()) != null) {
                    this.savepointCoordinator.restoreSavepoint(getAllVertices(), savepointRestorePath);
                }
                scheduleForExecution(this.scheduler);
            }
        } catch (Throwable th) {
            fail(th);
        }
    }

    public void restoreLatestCheckpointedState() throws Exception {
        synchronized (this.progressLock) {
            if (this.checkpointCoordinator != null) {
                this.checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
            }
        }
    }

    public void restoreSavepoint(String str) throws Exception {
        synchronized (this.progressLock) {
            if (this.savepointCoordinator == null) {
                throw new IllegalStateException("Checkpointing disabled.");
            }
            LOG.info("Restoring savepoint: " + str + ".");
            this.savepointCoordinator.restoreSavepoint(getAllVertices(), str);
        }
    }

    public void prepareForArchiving() {
        if (!this.state.isGloballyTerminalState()) {
            throw new IllegalStateException("Can only archive the job from a terminal state");
        }
        this.restartStrategy = null;
        this.scheduler = null;
        this.checkpointCoordinator = null;
        this.executionContext = null;
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            it.next().prepareForArchiving();
        }
        this.intermediateResults.clear();
        this.currentExecutions.clear();
        this.requiredJarFiles.clear();
        this.jobStatusListenerActors.clear();
        this.executionListenerActors.clear();
        if (this.userClassLoader instanceof FlinkUserCodeClassLoader) {
            try {
                ((FlinkUserCodeClassLoader) this.userClassLoader).close();
            } catch (IOException e) {
                LOG.warn("Failed to close the user classloader for job {}", this.jobID, e);
            }
        }
        this.userClassLoader = null;
        this.isArchived = true;
    }

    public ExecutionConfigSummary getExecutionConfigSummary() {
        return this.executionConfigSummary;
    }

    public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
        return this.serializedExecutionConfig;
    }

    public void waitUntilFinished() throws InterruptedException {
        synchronized (this.progressLock) {
            while (!this.state.isGloballyTerminalState()) {
                this.progressLock.wait();
            }
        }
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        return transitionState(jobStatus, jobStatus2, null);
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2, Throwable th) {
        if (!STATE_UPDATER.compareAndSet(this, jobStatus, jobStatus2)) {
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} switched from {} to {}.", getJobName(), jobStatus, jobStatus2);
        }
        this.stateTimestamps[jobStatus2.ordinal()] = System.currentTimeMillis();
        notifyJobStatusChange(jobStatus2, th);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void jobVertexInFinalState() {
        synchronized (this.progressLock) {
            if (this.numFinishedJobVertices >= this.verticesInCreationOrder.size()) {
                throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
            }
            this.numFinishedJobVertices++;
            if (this.numFinishedJobVertices == this.verticesInCreationOrder.size()) {
                while (true) {
                    JobStatus jobStatus = this.state;
                    if (jobStatus == JobStatus.RUNNING) {
                        if (transitionState(jobStatus, JobStatus.FINISHED)) {
                            postRunCleanup();
                            break;
                        }
                    } else if (jobStatus == JobStatus.CANCELLING) {
                        if (transitionState(jobStatus, JobStatus.CANCELED)) {
                            postRunCleanup();
                            break;
                        }
                    } else if (jobStatus == JobStatus.FAILING) {
                        boolean z = !(this.failureCause instanceof SuppressRestartsException);
                        if (z && this.restartStrategy.canRestart() && transitionState(jobStatus, JobStatus.RESTARTING)) {
                            this.restartStrategy.restart(this);
                            break;
                        } else if ((!z || !this.restartStrategy.canRestart()) && transitionState(jobStatus, JobStatus.FAILED, this.failureCause)) {
                            postRunCleanup();
                            break;
                        }
                    } else if (jobStatus != JobStatus.SUSPENDED) {
                        if (jobStatus.isGloballyTerminalState()) {
                            LOG.warn("Job has entered globally terminal state without waiting for all job vertices to reach final state.");
                        } else {
                            fail(new Exception("ExecutionGraph went into final state from state " + jobStatus));
                        }
                    }
                }
                this.progressLock.notifyAll();
            }
        }
    }

    private void postRunCleanup() {
        try {
            CheckpointCoordinator checkpointCoordinator = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (checkpointCoordinator != null) {
                if (this.state.isGloballyTerminalState()) {
                    checkpointCoordinator.shutdown();
                } else {
                    checkpointCoordinator.suspend();
                }
            }
        } catch (Exception e) {
            LOG.error("Error while cleaning up after execution", (Throwable) e);
        }
        try {
            SavepointCoordinator savepointCoordinator = this.savepointCoordinator;
            this.savepointCoordinator = null;
            if (savepointCoordinator != null) {
                if (this.state.isGloballyTerminalState()) {
                    savepointCoordinator.shutdown();
                } else {
                    savepointCoordinator.suspend();
                }
            }
        } catch (Exception e2) {
            LOG.error("Error while cleaning up after execution", (Throwable) e2);
        }
    }

    public boolean updateState(TaskExecutionState taskExecutionState) {
        Execution execution = this.currentExecutions.get(taskExecutionState.getID());
        if (execution == null) {
            return false;
        }
        switch (taskExecutionState.getExecutionState()) {
            case RUNNING:
                return execution.switchToRunning();
            case FINISHED:
                try {
                    AccumulatorSnapshot accumulators = taskExecutionState.getAccumulators();
                    execution.markFinished(accumulators.deserializeFlinkAccumulators(), accumulators.deserializeUserAccumulators(this.userClassLoader));
                    return true;
                } catch (Exception e) {
                    LOG.error("Failed to deserialize final accumulator results.", (Throwable) e);
                    execution.markFailed(e);
                    return true;
                }
            case CANCELED:
                execution.cancelingComplete();
                return true;
            case FAILED:
                execution.markFailed(taskExecutionState.getError(this.userClassLoader));
                return true;
            default:
                execution.fail(new Exception("TaskManager sent illegal state update: " + taskExecutionState.getExecutionState()));
                return false;
        }
    }

    public void scheduleOrUpdateConsumers(ResultPartitionID resultPartitionID) {
        Execution execution = this.currentExecutions.get(resultPartitionID.getProducerId());
        if (execution == null) {
            fail(new IllegalStateException("Cannot find execution for execution ID " + resultPartitionID.getPartitionId()));
        } else if (execution.getVertex() == null) {
            fail(new IllegalStateException("Execution with execution ID " + resultPartitionID.getPartitionId() + " has no vertex assigned."));
        } else {
            execution.getVertex().scheduleOrUpdateConsumers(resultPartitionID);
        }
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerExecution(Execution execution) {
        if (this.currentExecutions.putIfAbsent(execution.getAttemptId(), execution) != null) {
            fail(new Exception("Trying to register execution " + execution + " for already used ID " + execution.getAttemptId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deregisterExecution(Execution execution) {
        Execution remove = this.currentExecutions.remove(execution.getAttemptId());
        if (remove == null || remove == execution) {
            return;
        }
        fail(new Exception("De-registering execution " + execution + " failed. Found for same ID execution " + remove));
    }

    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        try {
            Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> deserializeFlinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
            Map<String, Accumulator<?, ?>> deserializeUserAccumulators = accumulatorSnapshot.deserializeUserAccumulators(this.userClassLoader);
            ExecutionAttemptID executionAttemptID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = this.currentExecutions.get(executionAttemptID);
            if (execution != null) {
                execution.setAccumulators(deserializeFlinkAccumulators, deserializeUserAccumulators);
            } else {
                LOG.warn("Received accumulator result for unknown execution {}.", executionAttemptID);
            }
        } catch (Exception e) {
            LOG.error("Cannot update accumulators for job {}.", this.jobID, e);
        }
    }

    public void registerJobStatusListener(ActorGateway actorGateway) {
        if (actorGateway != null) {
            this.jobStatusListenerActors.add(actorGateway);
        }
    }

    public void registerExecutionListener(ActorGateway actorGateway) {
        if (actorGateway != null) {
            this.executionListenerActors.add(actorGateway);
        }
    }

    private void notifyJobStatusChange(JobStatus jobStatus, Throwable th) {
        if (this.jobStatusListenerActors.size() > 0) {
            ExecutionGraphMessages.JobStatusChanged jobStatusChanged = new ExecutionGraphMessages.JobStatusChanged(this.jobID, jobStatus, System.currentTimeMillis(), th == null ? null : new SerializedThrowable(th));
            Iterator<ActorGateway> it = this.jobStatusListenerActors.iterator();
            while (it.hasNext()) {
                it.next().tell(jobStatusChanged);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyExecutionChange(JobVertexID jobVertexID, int i, ExecutionAttemptID executionAttemptID, ExecutionState executionState, Throwable th) {
        ExecutionJobVertex jobVertex = getJobVertex(jobVertexID);
        if (this.executionListenerActors.size() > 0) {
            ExecutionGraphMessages.ExecutionStateChanged executionStateChanged = new ExecutionGraphMessages.ExecutionStateChanged(this.jobID, jobVertexID, jobVertex.getJobVertex().getName(), jobVertex.getParallelism(), i, executionAttemptID, executionState, System.currentTimeMillis(), th == null ? null : ExceptionUtils.stringifyException(th));
            Iterator<ActorGateway> it = this.executionListenerActors.iterator();
            while (it.hasNext()) {
                it.next().tell(executionStateChanged);
            }
        }
        if (executionState == ExecutionState.FAILED) {
            fail(th);
        }
    }
}
