package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SelectTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.SplitTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGenerator.class */
public class StreamGraphGenerator {
    private StreamGraph streamGraph;
    private final StreamExecutionEnvironment env;
    private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamGraphGenerator.class);
    protected static Integer iterationIdCounter = 0;

    public static int getNewIterationNodeId() {
        Integer num = iterationIdCounter;
        iterationIdCounter = Integer.valueOf(iterationIdCounter.intValue() - 1);
        return iterationIdCounter.intValue();
    }

    private StreamGraphGenerator(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.streamGraph = new StreamGraph(streamExecutionEnvironment);
        this.streamGraph.setChaining(streamExecutionEnvironment.isChainingEnabled());
        this.streamGraph.setStateBackend(streamExecutionEnvironment.getStateBackend());
        this.env = streamExecutionEnvironment;
        this.alreadyTransformed = new HashMap();
    }

    public static StreamGraph generate(StreamExecutionEnvironment streamExecutionEnvironment, List<StreamTransformation<?>> list) {
        return new StreamGraphGenerator(streamExecutionEnvironment).generateInternal(list);
    }

    private StreamGraph generateInternal(List<StreamTransformation<?>> list) {
        Iterator<StreamTransformation<?>> it = list.iterator();
        while (it.hasNext()) {
            transform(it.next());
        }
        return this.streamGraph;
    }

    private Collection<Integer> transform(StreamTransformation<?> streamTransformation) {
        Collection<Integer> transformPartition;
        if (this.alreadyTransformed.containsKey(streamTransformation)) {
            return this.alreadyTransformed.get(streamTransformation);
        }
        LOG.debug("Transforming " + streamTransformation);
        streamTransformation.getOutputType();
        if (streamTransformation instanceof OneInputTransformation) {
            transformPartition = transformOnInputTransform((OneInputTransformation) streamTransformation);
        } else if (streamTransformation instanceof TwoInputTransformation) {
            transformPartition = transformTwoInputTransform((TwoInputTransformation) streamTransformation);
        } else if (streamTransformation instanceof SourceTransformation) {
            transformPartition = transformSource((SourceTransformation) streamTransformation);
        } else if (streamTransformation instanceof SinkTransformation) {
            transformPartition = transformSink((SinkTransformation) streamTransformation);
        } else if (streamTransformation instanceof UnionTransformation) {
            transformPartition = transformUnion((UnionTransformation) streamTransformation);
        } else if (streamTransformation instanceof SplitTransformation) {
            transformPartition = transformSplit((SplitTransformation) streamTransformation);
        } else if (streamTransformation instanceof SelectTransformation) {
            transformPartition = transformSelect((SelectTransformation) streamTransformation);
        } else if (streamTransformation instanceof FeedbackTransformation) {
            transformPartition = transformFeedback((FeedbackTransformation) streamTransformation);
        } else if (streamTransformation instanceof CoFeedbackTransformation) {
            transformPartition = transformCoFeedback((CoFeedbackTransformation) streamTransformation);
        } else {
            if (!(streamTransformation instanceof PartitionTransformation)) {
                throw new IllegalStateException("Unknown transformation: " + streamTransformation);
            }
            transformPartition = transformPartition((PartitionTransformation) streamTransformation);
        }
        if (!this.alreadyTransformed.containsKey(streamTransformation)) {
            this.alreadyTransformed.put(streamTransformation, transformPartition);
        }
        if (streamTransformation.getBufferTimeout() > 0) {
            this.streamGraph.setBufferTimeout(Integer.valueOf(streamTransformation.getId()), streamTransformation.getBufferTimeout());
        }
        if (streamTransformation.getUid() != null) {
            this.streamGraph.setTransformationId(Integer.valueOf(streamTransformation.getId()), streamTransformation.getUid());
        }
        return transformPartition;
    }

    private <T> Collection<Integer> transformUnion(UnionTransformation<T> unionTransformation) {
        List<StreamTransformation<T>> inputs = unionTransformation.getInputs();
        ArrayList arrayList = new ArrayList();
        Iterator<StreamTransformation<T>> it = inputs.iterator();
        while (it.hasNext()) {
            arrayList.addAll(transform(it.next()));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partitionTransformation) {
        StreamTransformation<?> input = partitionTransformation.getInput();
        ArrayList arrayList = new ArrayList();
        for (Integer num : transform(input)) {
            int newNodeId = StreamTransformation.getNewNodeId();
            this.streamGraph.addVirtualPartitionNode(num, Integer.valueOf(newNodeId), partitionTransformation.getPartitioner());
            arrayList.add(Integer.valueOf(newNodeId));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformSplit(SplitTransformation<T> splitTransformation) {
        Collection<Integer> transform = transform(splitTransformation.getInput());
        if (this.alreadyTransformed.containsKey(splitTransformation)) {
            return this.alreadyTransformed.get(splitTransformation);
        }
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addOutputSelector(Integer.valueOf(it.next().intValue()), splitTransformation.getOutputSelector());
        }
        return transform;
    }

    private <T> Collection<Integer> transformSelect(SelectTransformation<T> selectTransformation) {
        Collection<Integer> transform = transform(selectTransformation.getInput());
        if (this.alreadyTransformed.containsKey(selectTransformation)) {
            return this.alreadyTransformed.get(selectTransformation);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            int newNodeId = StreamTransformation.getNewNodeId();
            this.streamGraph.addVirtualSelectNode(Integer.valueOf(intValue), Integer.valueOf(newNodeId), selectTransformation.getSelectedNames());
            arrayList.add(Integer.valueOf(newNodeId));
        }
        return arrayList;
    }

    private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> feedbackTransformation) {
        if (feedbackTransformation.getFeedbackEdges().size() <= 0) {
            throw new IllegalStateException("Iteration " + feedbackTransformation + " does not have any feedback edges.");
        }
        StreamTransformation<T> input = feedbackTransformation.getInput();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(transform(input));
        if (this.alreadyTransformed.containsKey(feedbackTransformation)) {
            return this.alreadyTransformed.get(feedbackTransformation);
        }
        Tuple2<StreamNode, StreamNode> createIterationSourceAndSink = this.streamGraph.createIterationSourceAndSink(feedbackTransformation.getId(), getNewIterationNodeId(), getNewIterationNodeId(), feedbackTransformation.getWaitTime().longValue(), feedbackTransformation.getParallelism());
        StreamNode streamNode = createIterationSourceAndSink.f0;
        StreamNode streamNode2 = createIterationSourceAndSink.f1;
        this.streamGraph.setSerializers(Integer.valueOf(streamNode.getId()), null, null, feedbackTransformation.getOutputType().createSerializer(this.env.getConfig()));
        this.streamGraph.setSerializers(Integer.valueOf(streamNode2.getId()), feedbackTransformation.getOutputType().createSerializer(this.env.getConfig()), null, null);
        arrayList.add(Integer.valueOf(streamNode.getId()));
        this.alreadyTransformed.put(feedbackTransformation, arrayList);
        ArrayList arrayList2 = new ArrayList();
        Iterator<StreamTransformation<T>> it = feedbackTransformation.getFeedbackEdges().iterator();
        while (it.hasNext()) {
            Collection<Integer> transform = transform(it.next());
            arrayList2.addAll(transform);
            Iterator<Integer> it2 = transform.iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge(it2.next(), Integer.valueOf(streamNode2.getId()), 0);
            }
        }
        String determineSlotSharingGroup = determineSlotSharingGroup(null, arrayList2);
        streamNode2.setSlotSharingGroup(determineSlotSharingGroup);
        streamNode.setSlotSharingGroup(determineSlotSharingGroup);
        return arrayList;
    }

    private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coFeedbackTransformation) {
        Tuple2<StreamNode, StreamNode> createIterationSourceAndSink = this.streamGraph.createIterationSourceAndSink(coFeedbackTransformation.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coFeedbackTransformation.getWaitTime().longValue(), coFeedbackTransformation.getParallelism());
        StreamNode streamNode = createIterationSourceAndSink.f0;
        StreamNode streamNode2 = createIterationSourceAndSink.f1;
        this.streamGraph.setSerializers(Integer.valueOf(streamNode.getId()), null, null, coFeedbackTransformation.getOutputType().createSerializer(this.env.getConfig()));
        this.streamGraph.setSerializers(Integer.valueOf(streamNode2.getId()), coFeedbackTransformation.getOutputType().createSerializer(this.env.getConfig()), null, null);
        this.alreadyTransformed.put(coFeedbackTransformation, Collections.singleton(Integer.valueOf(streamNode.getId())));
        ArrayList arrayList = new ArrayList();
        Iterator<StreamTransformation<F>> it = coFeedbackTransformation.getFeedbackEdges().iterator();
        while (it.hasNext()) {
            Collection<Integer> transform = transform(it.next());
            arrayList.addAll(transform);
            Iterator<Integer> it2 = transform.iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge(it2.next(), Integer.valueOf(streamNode2.getId()), 0);
            }
        }
        String determineSlotSharingGroup = determineSlotSharingGroup(null, arrayList);
        streamNode2.setSlotSharingGroup(determineSlotSharingGroup);
        streamNode.setSlotSharingGroup(determineSlotSharingGroup);
        return Collections.singleton(Integer.valueOf(streamNode.getId()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> Collection<Integer> transformSource(SourceTransformation<T> sourceTransformation) {
        this.streamGraph.addSource(Integer.valueOf(sourceTransformation.getId()), determineSlotSharingGroup(sourceTransformation.getSlotSharingGroup(), new ArrayList()), sourceTransformation.getOperator(), null, sourceTransformation.getOutputType(), "Source: " + sourceTransformation.getName());
        if (sourceTransformation.getOperator().getUserFunction() instanceof InputFormatSourceFunction) {
            this.streamGraph.setInputFormat(Integer.valueOf(sourceTransformation.getId()), ((InputFormatSourceFunction) sourceTransformation.getOperator().getUserFunction()).getFormat());
        }
        this.streamGraph.setParallelism(Integer.valueOf(sourceTransformation.getId()), sourceTransformation.getParallelism());
        return Collections.singleton(Integer.valueOf(sourceTransformation.getId()));
    }

    private <T> Collection<Integer> transformSink(SinkTransformation<T> sinkTransformation) {
        Collection<Integer> transform = transform(sinkTransformation.getInput());
        this.streamGraph.addSink(Integer.valueOf(sinkTransformation.getId()), determineSlotSharingGroup(sinkTransformation.getSlotSharingGroup(), transform), sinkTransformation.getOperator(), sinkTransformation.getInput().getOutputType(), null, "Sink: " + sinkTransformation.getName());
        this.streamGraph.setParallelism(Integer.valueOf(sinkTransformation.getId()), sinkTransformation.getParallelism());
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addEdge(it.next(), Integer.valueOf(sinkTransformation.getId()), 0);
        }
        if (sinkTransformation.getStateKeySelector() != null) {
            this.streamGraph.setOneInputStateKey(Integer.valueOf(sinkTransformation.getId()), sinkTransformation.getStateKeySelector(), sinkTransformation.getStateKeyType().createSerializer(this.env.getConfig()));
        }
        return Collections.emptyList();
    }

    private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> oneInputTransformation) {
        Collection<Integer> transform = transform(oneInputTransformation.getInput());
        if (this.alreadyTransformed.containsKey(oneInputTransformation)) {
            return this.alreadyTransformed.get(oneInputTransformation);
        }
        this.streamGraph.addOperator(Integer.valueOf(oneInputTransformation.getId()), determineSlotSharingGroup(oneInputTransformation.getSlotSharingGroup(), transform), oneInputTransformation.getOperator(), oneInputTransformation.getInputType(), oneInputTransformation.getOutputType(), oneInputTransformation.getName());
        if (oneInputTransformation.getStateKeySelector() != null) {
            this.streamGraph.setOneInputStateKey(Integer.valueOf(oneInputTransformation.getId()), oneInputTransformation.getStateKeySelector(), oneInputTransformation.getStateKeyType().createSerializer(this.env.getConfig()));
        }
        this.streamGraph.setParallelism(Integer.valueOf(oneInputTransformation.getId()), oneInputTransformation.getParallelism());
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addEdge(it.next(), Integer.valueOf(oneInputTransformation.getId()), 0);
        }
        return Collections.singleton(Integer.valueOf(oneInputTransformation.getId()));
    }

    private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> twoInputTransformation) {
        Collection<Integer> transform = transform(twoInputTransformation.getInput1());
        Collection<Integer> transform2 = transform(twoInputTransformation.getInput2());
        if (this.alreadyTransformed.containsKey(twoInputTransformation)) {
            return this.alreadyTransformed.get(twoInputTransformation);
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(transform);
        arrayList.addAll(transform2);
        this.streamGraph.addCoOperator(Integer.valueOf(twoInputTransformation.getId()), determineSlotSharingGroup(twoInputTransformation.getSlotSharingGroup(), arrayList), twoInputTransformation.getOperator(), twoInputTransformation.getInputType1(), twoInputTransformation.getInputType2(), twoInputTransformation.getOutputType(), twoInputTransformation.getName());
        if (twoInputTransformation.getStateKeySelector1() != null) {
            this.streamGraph.setTwoInputStateKey(Integer.valueOf(twoInputTransformation.getId()), twoInputTransformation.getStateKeySelector1(), twoInputTransformation.getStateKeySelector2(), twoInputTransformation.getStateKeyType().createSerializer(this.env.getConfig()));
        }
        this.streamGraph.setParallelism(Integer.valueOf(twoInputTransformation.getId()), twoInputTransformation.getParallelism());
        Iterator<Integer> it = transform.iterator();
        while (it.hasNext()) {
            this.streamGraph.addEdge(it.next(), Integer.valueOf(twoInputTransformation.getId()), 1);
        }
        Iterator<Integer> it2 = transform2.iterator();
        while (it2.hasNext()) {
            this.streamGraph.addEdge(it2.next(), Integer.valueOf(twoInputTransformation.getId()), 2);
        }
        return Collections.singleton(Integer.valueOf(twoInputTransformation.getId()));
    }

    private String determineSlotSharingGroup(String str, Collection<Integer> collection) {
        if (str != null) {
            return str;
        }
        String str2 = null;
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            String slotSharingGroup = this.streamGraph.getSlotSharingGroup(Integer.valueOf(it.next().intValue()));
            if (str2 == null) {
                str2 = slotSharingGroup;
            } else if (!str2.equals(slotSharingGroup)) {
                return "default";
            }
        }
        return str2 == null ? "default" : str2;
    }
}
