/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.chain;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.chain.PythonOperatorChainingOptimizer;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunction;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedCoProcessOperator;
import org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.functions.python.PythonFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class PythonOperatorChainingOptimizerTest {
    PythonOperatorChainingOptimizerTest() {
    }

    @Test
    void testChainedTransformationPropertiesCorrectlySet() {
        ExternalPythonKeyedProcessOperator keyedProcessOperator = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", Types.STRING(), Types.STRING());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation keyedProcessTransformation = new OneInputTransformation(sourceTransformation, "keyedProcess", keyedProcessOperator, keyedProcessOperator.getProducedType(), 2);
        keyedProcessTransformation.setUid("uid");
        keyedProcessTransformation.setSlotSharingGroup("group");
        keyedProcessTransformation.setCoLocationGroupKey("col");
        keyedProcessTransformation.setMaxParallelism(64);
        keyedProcessTransformation.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 5);
        keyedProcessTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        keyedProcessTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.STATE_BACKEND);
        keyedProcessTransformation.setBufferTimeout(1000L);
        keyedProcessTransformation.setChainingStrategy(ChainingStrategy.HEAD);
        OneInputTransformation processTransformation = new OneInputTransformation((Transformation)keyedProcessTransformation, "process", processOperator, processOperator.getProducedType(), 2);
        processTransformation.setSlotSharingGroup("group");
        processTransformation.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 10);
        processTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        processTransformation.setMaxParallelism(64);
        processTransformation.setBufferTimeout(500L);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation);
        transformations.add(keyedProcessTransformation);
        transformations.add(processTransformation);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(2);
        OneInputTransformation chainedTransformation = (OneInputTransformation)optimized.get(1);
        Assertions.assertThat((int)chainedTransformation.getParallelism()).isEqualTo(2);
        Assertions.assertThat((Object)sourceTransformation.getOutputType()).isEqualTo((Object)chainedTransformation.getInputType());
        Assertions.assertThat((Object)processOperator.getProducedType()).isEqualTo((Object)chainedTransformation.getOutputType());
        Assertions.assertThat((String)keyedProcessTransformation.getUid()).isEqualTo(chainedTransformation.getUid());
        Assertions.assertThat((String)((SlotSharingGroup)chainedTransformation.getSlotSharingGroup().get()).getName()).isEqualTo("group");
        Assertions.assertThat((String)chainedTransformation.getCoLocationGroupKey()).isEqualTo("col");
        Assertions.assertThat((int)chainedTransformation.getMaxParallelism()).isEqualTo(64);
        Assertions.assertThat((long)chainedTransformation.getBufferTimeout()).isEqualTo(500L);
        Assertions.assertThat((int)chainedTransformation.getManagedMemoryOperatorScopeUseCaseWeights().getOrDefault(ManagedMemoryUseCase.OPERATOR, 0)).isEqualTo(15);
        Assertions.assertThat((Comparable)chainedTransformation.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.HEAD);
        Assertions.assertThat((Collection)chainedTransformation.getManagedMemorySlotScopeUseCases()).contains((Object[])new ManagedMemoryUseCase[]{ManagedMemoryUseCase.PYTHON});
        Assertions.assertThat((Collection)chainedTransformation.getManagedMemorySlotScopeUseCases()).contains((Object[])new ManagedMemoryUseCase[]{ManagedMemoryUseCase.STATE_BACKEND});
        OneInputStreamOperator chainedOperator = chainedTransformation.getOperator();
        Assertions.assertThat((Object)chainedOperator).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator).getPythonFunctionInfo(), "f2", "f1");
    }

    @Test
    void testChainingMultipleOperators() {
        ExternalPythonKeyedProcessOperator keyedProcessOperator = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", Types.STRING(), Types.LONG());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f3", Types.LONG(), Types.INT());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation keyedProcessTransformation = new OneInputTransformation(sourceTransformation, "keyedProcess", keyedProcessOperator, keyedProcessOperator.getProducedType(), 2);
        OneInputTransformation processTransformation1 = new OneInputTransformation((Transformation)keyedProcessTransformation, "process", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator2, processOperator2.getProducedType(), 2);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation);
        transformations.add(keyedProcessTransformation);
        transformations.add(processTransformation1);
        transformations.add(processTransformation2);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(2);
        OneInputTransformation chainedTransformation = (OneInputTransformation)optimized.get(1);
        Assertions.assertThat((Object)sourceTransformation.getOutputType()).isEqualTo((Object)chainedTransformation.getInputType());
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation.getOutputType());
        OneInputStreamOperator chainedOperator = chainedTransformation.getOperator();
        Assertions.assertThat((Object)chainedOperator).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator).getPythonFunctionInfo(), "f3", "f2", "f1");
    }

    @Test
    void testChainingNonKeyedOperators() {
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", Types.STRING(), Types.INT());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation processTransformation1 = new OneInputTransformation(sourceTransformation, "Process1", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process2", processOperator2, processOperator2.getProducedType(), 2);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation);
        transformations.add(processTransformation1);
        transformations.add(processTransformation2);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(2);
        OneInputTransformation chainedTransformation = (OneInputTransformation)optimized.get(1);
        Assertions.assertThat((Object)sourceTransformation.getOutputType()).isEqualTo((Object)chainedTransformation.getInputType());
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation.getOutputType());
        OneInputStreamOperator chainedOperator = chainedTransformation.getOperator();
        Assertions.assertThat((Object)chainedOperator).isInstanceOf(ExternalPythonProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonProcessOperator)chainedOperator).getPythonFunctionInfo(), "f2", "f1");
    }

    @Test
    void testContinuousKeyedOperators() {
        ExternalPythonKeyedProcessOperator keyedProcessOperator1 = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}));
        ExternalPythonKeyedProcessOperator keyedProcessOperator2 = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f2", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation processTransformation1 = new OneInputTransformation(sourceTransformation, "KeyedProcess1", keyedProcessOperator1, keyedProcessOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "KeyedProcess2", keyedProcessOperator2, keyedProcessOperator2.getProducedType(), 2);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation);
        transformations.add(processTransformation1);
        transformations.add(processTransformation2);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(3);
        Assertions.assertThat(optimized.get(1)).isEqualTo((Object)processTransformation1);
        Assertions.assertThat(optimized.get(2)).isEqualTo((Object)processTransformation2);
    }

    @Test
    void testMultipleChainedOperators() {
        ExternalPythonKeyedProcessOperator keyedProcessOperator1 = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f3", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.LONG());
        ExternalPythonKeyedProcessOperator keyedProcessOperator2 = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f4", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator3 = PythonOperatorChainingOptimizerTest.createProcessOperator("f5", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation keyedProcessTransformation1 = new OneInputTransformation(sourceTransformation, "keyedProcess", keyedProcessOperator1, keyedProcessOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation1 = new OneInputTransformation((Transformation)keyedProcessTransformation1, "process", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator2, processOperator2.getProducedType(), 2);
        OneInputTransformation keyedProcessTransformation2 = new OneInputTransformation((Transformation)processTransformation2, "keyedProcess", keyedProcessOperator2, keyedProcessOperator2.getProducedType(), 2);
        OneInputTransformation processTransformation3 = new OneInputTransformation((Transformation)keyedProcessTransformation2, "process", processOperator3, processOperator3.getProducedType(), 2);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation);
        transformations.add(keyedProcessTransformation1);
        transformations.add(processTransformation1);
        transformations.add(processTransformation2);
        transformations.add(keyedProcessTransformation2);
        transformations.add(processTransformation3);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(3);
        OneInputTransformation chainedTransformation1 = (OneInputTransformation)optimized.get(1);
        Assertions.assertThat((Object)sourceTransformation.getOutputType()).isEqualTo((Object)chainedTransformation1.getInputType());
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation1.getOutputType());
        OneInputTransformation chainedTransformation2 = (OneInputTransformation)optimized.get(2);
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation2.getInputType());
        Assertions.assertThat((Object)processOperator3.getProducedType()).isEqualTo((Object)chainedTransformation2.getOutputType());
        OneInputStreamOperator chainedOperator1 = chainedTransformation1.getOperator();
        Assertions.assertThat((Object)chainedOperator1).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator1).getPythonFunctionInfo(), "f3", "f2", "f1");
        OneInputStreamOperator chainedOperator2 = chainedTransformation2.getOperator();
        Assertions.assertThat((Object)chainedOperator2).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator2).getPythonFunctionInfo(), "f5", "f4");
    }

    @Test
    void testChainingTwoInputOperators() {
        ExternalPythonKeyedCoProcessOperator keyedCoProcessOperator1 = PythonOperatorChainingOptimizerTest.createCoKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.STRING()}), new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f3", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.LONG());
        ExternalPythonKeyedProcessOperator keyedProcessOperator2 = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f4", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator3 = PythonOperatorChainingOptimizerTest.createProcessOperator("f5", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        Transformation sourceTransformation1 = (Transformation)Mockito.mock(SourceTransformation.class);
        Transformation sourceTransformation2 = (Transformation)Mockito.mock(SourceTransformation.class);
        TwoInputTransformation keyedCoProcessTransformation = new TwoInputTransformation(sourceTransformation1, sourceTransformation2, "keyedCoProcess", keyedCoProcessOperator1, keyedCoProcessOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation1 = new OneInputTransformation((Transformation)keyedCoProcessTransformation, "process", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator2, processOperator2.getProducedType(), 2);
        OneInputTransformation keyedProcessTransformation = new OneInputTransformation((Transformation)processTransformation2, "keyedProcess", keyedProcessOperator2, keyedProcessOperator2.getProducedType(), 2);
        OneInputTransformation processTransformation3 = new OneInputTransformation((Transformation)keyedProcessTransformation, "process", processOperator3, processOperator3.getProducedType(), 2);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation1);
        transformations.add(sourceTransformation2);
        transformations.add(keyedCoProcessTransformation);
        transformations.add(processTransformation1);
        transformations.add(processTransformation2);
        transformations.add(keyedProcessTransformation);
        transformations.add(processTransformation3);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(4);
        TwoInputTransformation chainedTransformation1 = (TwoInputTransformation)optimized.get(2);
        Assertions.assertThat((Object)sourceTransformation1.getOutputType()).isEqualTo((Object)chainedTransformation1.getInputType1());
        Assertions.assertThat((Object)sourceTransformation2.getOutputType()).isEqualTo((Object)chainedTransformation1.getInputType2());
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation1.getOutputType());
        OneInputTransformation chainedTransformation2 = (OneInputTransformation)optimized.get(3);
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation2.getInputType());
        Assertions.assertThat((Object)processOperator3.getProducedType()).isEqualTo((Object)chainedTransformation2.getOutputType());
        TwoInputStreamOperator chainedOperator1 = chainedTransformation1.getOperator();
        Assertions.assertThat((Object)chainedOperator1).isInstanceOf(ExternalPythonKeyedCoProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedCoProcessOperator)chainedOperator1).getPythonFunctionInfo(), "f3", "f2", "f1");
        OneInputStreamOperator chainedOperator2 = chainedTransformation2.getOperator();
        Assertions.assertThat((Object)chainedOperator2).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator2).getPythonFunctionInfo(), "f5", "f4");
    }

    @Test
    void testChainingUnorderedTransformations() {
        ExternalPythonKeyedProcessOperator keyedProcessOperator = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", Types.STRING(), Types.LONG());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f3", Types.LONG(), Types.INT());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation keyedProcessTransformation = new OneInputTransformation(sourceTransformation, "keyedProcess", keyedProcessOperator, keyedProcessOperator.getProducedType(), 2);
        OneInputTransformation processTransformation1 = new OneInputTransformation((Transformation)keyedProcessTransformation, "process", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator2, processOperator2.getProducedType(), 2);
        ArrayList<Object> transformations = new ArrayList<Object>();
        transformations.add(sourceTransformation);
        transformations.add(processTransformation2);
        transformations.add(processTransformation1);
        transformations.add(keyedProcessTransformation);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(2);
        OneInputTransformation chainedTransformation = (OneInputTransformation)optimized.get(1);
        Assertions.assertThat((Object)sourceTransformation.getOutputType()).isEqualTo((Object)chainedTransformation.getInputType());
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation.getOutputType());
        OneInputStreamOperator chainedOperator = chainedTransformation.getOperator();
        Assertions.assertThat((Object)chainedOperator).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator).getPythonFunctionInfo(), "f3", "f2", "f1");
    }

    @Test
    void testSingleTransformation() {
        ExternalPythonKeyedProcessOperator keyedProcessOperator = PythonOperatorChainingOptimizerTest.createKeyedProcessOperator("f1", new RowTypeInfo(new TypeInformation[]{Types.INT(), Types.INT()}), Types.STRING());
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", Types.STRING(), Types.LONG());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f3", Types.LONG(), Types.INT());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation keyedProcessTransformation = new OneInputTransformation(sourceTransformation, "keyedProcess", keyedProcessOperator, keyedProcessOperator.getProducedType(), 2);
        OneInputTransformation processTransformation1 = new OneInputTransformation((Transformation)keyedProcessTransformation, "process", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator2, processOperator2.getProducedType(), 2);
        ArrayList<OneInputTransformation> transformations = new ArrayList<OneInputTransformation>();
        transformations.add(processTransformation2);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(2);
        OneInputTransformation chainedTransformation = (OneInputTransformation)optimized.get(0);
        Assertions.assertThat((Object)sourceTransformation.getOutputType()).isEqualTo((Object)chainedTransformation.getInputType());
        Assertions.assertThat((Object)processOperator2.getProducedType()).isEqualTo((Object)chainedTransformation.getOutputType());
        OneInputStreamOperator chainedOperator = chainedTransformation.getOperator();
        Assertions.assertThat((Object)chainedOperator).isInstanceOf(ExternalPythonKeyedProcessOperator.class);
        this.validateChainedPythonFunctions(((ExternalPythonKeyedProcessOperator)chainedOperator).getPythonFunctionInfo(), "f3", "f2", "f1");
    }

    @Test
    void testTransformationWithMultipleOutputs() {
        ExternalPythonProcessOperator processOperator1 = PythonOperatorChainingOptimizerTest.createProcessOperator("f1", Types.STRING(), Types.LONG());
        ExternalPythonProcessOperator processOperator2 = PythonOperatorChainingOptimizerTest.createProcessOperator("f2", Types.STRING(), Types.LONG());
        ExternalPythonProcessOperator processOperator3 = PythonOperatorChainingOptimizerTest.createProcessOperator("f3", Types.LONG(), Types.INT());
        Transformation sourceTransformation = (Transformation)Mockito.mock(SourceTransformation.class);
        OneInputTransformation processTransformation1 = new OneInputTransformation(sourceTransformation, "process", processOperator1, processOperator1.getProducedType(), 2);
        OneInputTransformation processTransformation2 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator2, processOperator2.getProducedType(), 2);
        OneInputTransformation processTransformation3 = new OneInputTransformation((Transformation)processTransformation1, "process", processOperator3, processOperator3.getProducedType(), 2);
        ArrayList<OneInputTransformation> transformations = new ArrayList<OneInputTransformation>();
        transformations.add(processTransformation2);
        transformations.add(processTransformation3);
        List optimized = PythonOperatorChainingOptimizer.optimize(transformations);
        Assertions.assertThat((List)optimized).hasSize(4);
    }

    private void validateChainedPythonFunctions(DataStreamPythonFunctionInfo pythonFunctionInfo, String ... expectedChainedPythonFunctions) {
        for (String expectedPythonFunction : expectedChainedPythonFunctions) {
            Assertions.assertThat((byte[])pythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()).isEqualTo((Object)expectedPythonFunction.getBytes());
            Object[] inputs = pythonFunctionInfo.getInputs();
            if (inputs.length > 0) {
                Assertions.assertThat((Object[])inputs).hasSize(1);
                pythonFunctionInfo = (DataStreamPythonFunctionInfo)inputs[0];
                continue;
            }
            pythonFunctionInfo = null;
        }
        Assertions.assertThat((Object)pythonFunctionInfo).isNull();
    }

    private static <OUT> ExternalPythonKeyedProcessOperator<OUT> createKeyedProcessOperator(String functionContent, RowTypeInfo inputTypeInfo, TypeInformation<OUT> outputTypeInfo) {
        return new ExternalPythonKeyedProcessOperator(new Configuration(), new DataStreamPythonFunctionInfo((PythonFunction)new DataStreamPythonFunction(functionContent.getBytes(), null), -1), inputTypeInfo, outputTypeInfo);
    }

    private static <OUT> ExternalPythonKeyedCoProcessOperator<OUT> createCoKeyedProcessOperator(String functionContent, RowTypeInfo inputTypeInfo1, RowTypeInfo inputTypeInfo2, TypeInformation<OUT> outputTypeInfo) {
        return new ExternalPythonKeyedCoProcessOperator(new Configuration(), new DataStreamPythonFunctionInfo((PythonFunction)new DataStreamPythonFunction(functionContent.getBytes(), null), -1), (TypeInformation)inputTypeInfo1, (TypeInformation)inputTypeInfo2, outputTypeInfo);
    }

    private static <IN, OUT> ExternalPythonProcessOperator<IN, OUT> createProcessOperator(String functionContent, TypeInformation<IN> inputTypeInfo, TypeInformation<OUT> outputTypeInfo) {
        return new ExternalPythonProcessOperator(new Configuration(), new DataStreamPythonFunctionInfo((PythonFunction)new DataStreamPythonFunction(functionContent.getBytes(), null), -1), inputTypeInfo, outputTypeInfo);
    }
}

