/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.operators.base;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;

@Internal
public class CoGroupRawOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, IN2, OUT>>
extends DualInputOperator<IN1, IN2, OUT, FT> {
    private Ordering groupOrder1;
    private Ordering groupOrder2;
    private boolean combinableFirst = false;
    private boolean combinableSecond = false;

    public CoGroupRawOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
        super(udf, operatorInfo, keyPositions1, keyPositions2, name);
    }

    public CoGroupRawOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
        this(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
    }

    public CoGroupRawOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
        this(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
    }

    public void setGroupOrder(int inputNum, Ordering order) {
        if (inputNum == 0) {
            this.groupOrder1 = order;
        } else if (inputNum == 1) {
            this.groupOrder2 = order;
        } else {
            throw new IndexOutOfBoundsException();
        }
    }

    public void setGroupOrderForInputOne(Ordering order) {
        this.setGroupOrder(0, order);
    }

    public void setGroupOrderForInputTwo(Ordering order) {
        this.setGroupOrder(1, order);
    }

    public Ordering getGroupOrder(int inputNum) {
        if (inputNum == 0) {
            return this.groupOrder1;
        }
        if (inputNum == 1) {
            return this.groupOrder2;
        }
        throw new IndexOutOfBoundsException();
    }

    public Ordering getGroupOrderForInputOne() {
        return this.getGroupOrder(0);
    }

    public Ordering getGroupOrderForInputTwo() {
        return this.getGroupOrder(1);
    }

    public boolean isCombinableFirst() {
        return this.combinableFirst;
    }

    public void setCombinableFirst(boolean combinableFirst) {
        this.combinableFirst = combinableFirst;
    }

    public boolean isCombinableSecond() {
        return this.combinableSecond;
    }

    public void setCombinableSecond(boolean combinableSecond) {
        this.combinableSecond = combinableSecond;
    }

    @Override
    protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
        TypeInformation inputType1 = ((BinaryOperatorInformation)this.getOperatorInfo()).getFirstInputType();
        TypeInformation inputType2 = ((BinaryOperatorInformation)this.getOperatorInfo()).getSecondInputType();
        int[] inputKeys1 = this.getKeyColumns(0);
        int[] inputKeys2 = this.getKeyColumns(1);
        boolean[] inputSortDirections1 = new boolean[inputKeys1.length];
        boolean[] inputSortDirections2 = new boolean[inputKeys2.length];
        Arrays.fill(inputSortDirections1, true);
        Arrays.fill(inputSortDirections2, true);
        TypeSerializer inputSerializer1 = inputType1.createSerializer(executionConfig);
        TypeSerializer inputSerializer2 = inputType2.createSerializer(executionConfig);
        TypeComparator inputComparator1 = this.getTypeComparator(executionConfig, inputType1, inputKeys1, inputSortDirections1);
        TypeComparator inputComparator2 = this.getTypeComparator(executionConfig, inputType2, inputKeys2, inputSortDirections2);
        SimpleListIterable<IN1> iterator1 = new SimpleListIterable<IN1>(input1, inputComparator1, inputSerializer1);
        SimpleListIterable<IN2> iterator2 = new SimpleListIterable<IN2>(input2, inputComparator2, inputSerializer2);
        CoGroupFunction function = (CoGroupFunction)this.userFunction.getUserCodeObject();
        FunctionUtils.setFunctionRuntimeContext(function, ctx);
        FunctionUtils.openFunction(function, this.parameters);
        ArrayList result = new ArrayList();
        CopyingListCollector resultCollector = new CopyingListCollector(result, this.getOperatorInfo().getOutputType().createSerializer(executionConfig));
        function.coGroup(iterator1, iterator2, resultCollector);
        FunctionUtils.closeFunction(function);
        return result;
    }

    private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
        if (!(inputType instanceof CompositeType)) {
            throw new InvalidProgramException("Input types of coGroup must be composite types.");
        }
        return ((CompositeType)inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
    }

    public static class SimpleListIterable<IN>
    implements Iterable<IN> {
        private List<IN> values;
        private TypeSerializer<IN> serializer;
        private boolean copy;

        public SimpleListIterable(List<IN> values2, final TypeComparator<IN> comparator, TypeSerializer<IN> serializer) throws IOException {
            this.values = values2;
            this.serializer = serializer;
            Collections.sort(values2, new Comparator<IN>(){

                @Override
                public int compare(IN o1, IN o2) {
                    return comparator.compare(o1, o2);
                }
            });
        }

        @Override
        public Iterator<IN> iterator() {
            return new SimpleListIterator<IN>(this.values, this.serializer);
        }

        protected class SimpleListIterator<IN>
        implements Iterator<IN> {
            private final List<IN> values;
            private final TypeSerializer<IN> serializer;
            private int pos = 0;

            public SimpleListIterator(List<IN> values2, TypeSerializer<IN> serializer) {
                this.values = values2;
                this.serializer = serializer;
            }

            @Override
            public boolean hasNext() {
                return this.pos < this.values.size();
            }

            @Override
            public IN next() {
                IN current = this.values.get(this.pos++);
                return this.serializer.copy(current);
            }

            @Override
            public void remove() {
            }
        }
    }
}

