package org.apache.flink.state.api.utils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.AbstractID;

/* loaded from: input_file:org/apache/flink/state/api/utils/SavepointTestBase.class */
public abstract class SavepointTestBase extends AbstractTestBase {
    public String takeSavepoint(StreamExecutionEnvironment streamExecutionEnvironment) {
        StreamExecutionEnvironment.getExecutionEnvironment().getConfig().disableClosureCleaner();
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        JobID jobID = jobGraph.getJobID();
        ClusterClient<?> clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
        try {
            try {
                JobID jobID2 = (JobID) clusterClient.submitJob(jobGraph).get();
                waitForAllRunningOrSomeTerminal(jobID2, MINI_CLUSTER_RESOURCE);
                String str = triggerSavepoint(clusterClient, jobID2).get(5L, TimeUnit.MINUTES);
                clusterClient.cancel(jobID);
                return str;
            } catch (Exception e) {
                throw new RuntimeException("Failed to take savepoint", e);
            }
        } catch (Throwable th) {
            clusterClient.cancel(jobID);
            throw th;
        }
    }

    public static void waitForAllRunningOrSomeTerminal(JobID jobID, MiniClusterWithClientResource miniClusterWithClientResource) throws Exception {
        while (true) {
            Set set = (Set) ((JobDetailsInfo) miniClusterWithClientResource.getRestClusterClient().getJobDetails(jobID).get()).getJobVertexInfos().stream().map((v0) -> {
                return v0.getExecutionState();
            }).collect(Collectors.toSet());
            if (set.equals(EnumSet.of(ExecutionState.RUNNING)) || set.stream().anyMatch((v0) -> {
                return v0.isTerminal();
            })) {
                return;
            } else {
                Thread.sleep(500L);
            }
        }
    }

    public <T> SourceFunction<T> createSource(T[] tArr) {
        return createSource(Arrays.asList(tArr));
    }

    public <T> SourceFunction<T> createSource(Collection<T> collection) {
        T next = collection.iterator().next();
        if (next == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        TypeInformation forObject = TypeExtractor.getForObject(next);
        try {
            return new WaitingSource(new FromElementsFunction(forObject.createSerializer(new ExecutionConfig()), collection), forObject);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private CompletableFuture<String> triggerSavepoint(ClusterClient<?> clusterClient, JobID jobID) throws RuntimeException {
        try {
            return clusterClient.triggerSavepoint(jobID, getTempDirPath(new AbstractID().toHexString()), SavepointFormatType.CANONICAL);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
