/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.utils;

import com.google.protobuf.GeneratedMessageV3;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.RowType;

public class PassThroughPythonAggregateFunctionRunner
extends BeamTablePythonFunctionRunner {
    private static final IntSerializer windowBoundarySerializer = IntSerializer.INSTANCE;
    private final List<byte[]> buffer = new LinkedList<byte[]>();
    private final ArrowSerializer arrowSerializer;
    private final boolean isBatchOverWindow;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;

    public PassThroughPythonAggregateFunctionRunner(String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, String functionUrn, FlinkFnApi.UserDefinedFunctions userDefinedFunctions, FlinkMetricContainer flinkMetricContainer, boolean isBatchOverWindow) {
        super(taskName, environmentManager, functionUrn, (GeneratedMessageV3)userDefinedFunctions, flinkMetricContainer, null, null, null, null, 0.0, ProtoUtils.createArrowTypeCoderInfoDescriptorProto((RowType)inputType, (FlinkFnApi.CoderInfoDescriptor.Mode)FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, (boolean)false), ProtoUtils.createArrowTypeCoderInfoDescriptorProto((RowType)outputType, (FlinkFnApi.CoderInfoDescriptor.Mode)FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, (boolean)false));
        this.isBatchOverWindow = isBatchOverWindow;
        this.arrowSerializer = new ArrowSerializer(inputType, outputType);
    }

    public void open(ReadableConfig config) throws Exception {
        super.open(config);
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper((InputStream)this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.arrowSerializer.open((InputStream)this.bais, (OutputStream)this.baos);
    }

    protected void startBundle() {
        super.startBundle();
        this.mainInputReceiver = input -> {
            byte[] data = (byte[])input.getValue();
            this.bais.setBuffer(data, 0, data.length);
            if (this.isBatchOverWindow) {
                int windowSize = windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper);
                ArrayList<Integer> lowerBoundarys = new ArrayList<Integer>();
                for (int i = 0; i < windowSize; ++i) {
                    int windowLength = windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper);
                    for (int j = 0; j < windowLength; ++j) {
                        if (j % 2 == 0) {
                            lowerBoundarys.add(windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper));
                            continue;
                        }
                        windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper);
                    }
                }
                this.arrowSerializer.load();
                for (Integer lowerBoundary : lowerBoundarys) {
                    RowData firstData = this.arrowSerializer.read(lowerBoundary.intValue());
                    this.arrowSerializer.write(firstData);
                }
                this.arrowSerializer.resetReader();
            } else {
                this.arrowSerializer.load();
                this.arrowSerializer.write(this.arrowSerializer.read(0));
                this.arrowSerializer.resetReader();
            }
            this.arrowSerializer.finishCurrentBatch();
            this.buffer.add(this.baos.toByteArray());
            this.baos.reset();
            this.arrowSerializer.resetWriter();
        };
    }

    public void flush() throws Exception {
        super.flush();
        this.resultBuffer.addAll(this.buffer.stream().map(b -> Tuple2.of((Object)"", (Object)b)).collect(Collectors.toList()));
        this.buffer.clear();
    }

    public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) {
        return PythonTestUtils.createMockJobBundleFactory();
    }
}

