/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.aggregate;

import java.io.IOException;
import java.io.OutputStream;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.ProctimeAttribute;
import org.apache.flink.table.runtime.groupwindow.RowtimeAttribute;
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowStart;
import org.apache.flink.table.runtime.operators.python.aggregate.PythonStreamGroupWindowAggregateOperator;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class PassThroughPythonStreamGroupWindowAggregateOperator<K>
extends PythonStreamGroupWindowAggregateOperator<K, TimeWindow> {
    private final MockPythonWindowOperator<K> mockPythonWindowOperator = new MockPythonWindowOperator();
    private final int[] grouping;
    private final PythonAggregateFunctionInfo aggregateFunction;
    private FlinkFnApi.GroupWindow.WindowProperty[] namedProperties;
    private InternalTimerServiceImpl<K, TimeWindow> mockPythonInternalService;
    private Map<String, Map<TimeWindow, List<RowData>>> windowAccumulateData;
    private Map<String, Map<TimeWindow, List<RowData>>> windowRetractData;
    private transient UpdatableRowData reusePythonRowData;
    private transient UpdatableRowData reusePythonTimerRowData;
    private transient UpdatableRowData reusePythonTimerData;
    private transient LinkedBlockingQueue<Tuple2<String, byte[]>> resultBuffer;
    private Projection<RowData, BinaryRowData> groupKeyProjection;
    private Function<RowData, RowData> aggExtracter;
    private Function<TimeWindow, RowData> windowExtractor;
    private JoinedRowData reuseJoinedRow;
    private JoinedRowData windowAggResult;
    private transient ByteArrayOutputStreamWithPos windowBaos;
    private transient DataOutputViewStreamWrapper windowBaosWrapper;

    protected PassThroughPythonStreamGroupWindowAggregateOperator(Configuration config, RowType inputType, RowType outputType, PythonAggregateFunctionInfo[] aggregateFunctions, int[] grouping, int indexOfCountStar, boolean generateUpdateBefore, boolean countStarInserted, int inputTimeFieldIndex, WindowAssigner<TimeWindow> windowAssigner, FlinkFnApi.GroupWindow.WindowType windowType, boolean isRowTime, boolean isTimeWindow, long size, long slide, long gap, long allowedLateness, NamedWindowProperty[] namedProperties, ZoneId shiftTimeZone) {
        super(config, inputType, outputType, aggregateFunctions, new DataViewSpec[0][0], grouping, indexOfCountStar, generateUpdateBefore, countStarInserted, inputTimeFieldIndex, windowAssigner, windowType, isRowTime, isTimeWindow, size, slide, gap, allowedLateness, namedProperties, shiftTimeZone);
        this.aggregateFunction = aggregateFunctions[0];
        this.grouping = grouping;
        this.buildWindow(namedProperties);
    }

    public void open() throws Exception {
        super.open();
        this.windowBaos = new ByteArrayOutputStreamWithPos();
        this.windowBaosWrapper = new DataOutputViewStreamWrapper((OutputStream)this.windowBaos);
        this.reusePythonRowData = new UpdatableRowData((RowData)GenericRowData.of((Object[])new Object[]{(byte)0, null, null}), 3);
        this.reusePythonTimerRowData = new UpdatableRowData((RowData)GenericRowData.of((Object[])new Object[]{(byte)1, null, null}), 3);
        this.reusePythonTimerData = new UpdatableRowData((RowData)GenericRowData.of((Object[])new Object[]{0, null, null, null}), 4);
        this.reuseJoinedRow = new JoinedRowData();
        this.windowAggResult = new JoinedRowData();
        this.reusePythonTimerRowData.setField(2, (Object)this.reusePythonTimerData);
        this.windowAccumulateData = new HashMap<String, Map<TimeWindow, List<RowData>>>();
        this.windowRetractData = new HashMap<String, Map<TimeWindow, List<RowData>>>();
        this.mockPythonInternalService = (InternalTimerServiceImpl)this.getInternalTimerService("python-window-timers", this.windowSerializer, this.mockPythonWindowOperator);
        this.groupKeyProjection = this.createProjection("GroupKey", this.grouping);
        int inputFieldIndex = (Integer)this.aggregateFunction.getInputs()[0];
        this.aggExtracter = input -> {
            GenericRowData aggResult = new GenericRowData(1);
            aggResult.setField(0, (Object)input.getLong(inputFieldIndex));
            return aggResult;
        };
        this.windowExtractor = window -> {
            GenericRowData windowProperty = new GenericRowData(this.namedProperties.length);
            block6: for (int i = 0; i < this.namedProperties.length; ++i) {
                switch (this.namedProperties[i]) {
                    case WINDOW_START: {
                        windowProperty.setField(i, (Object)this.getShiftEpochMills(window.getStart()));
                        continue block6;
                    }
                    case WINDOW_END: {
                        windowProperty.setField(i, (Object)this.getShiftEpochMills(window.getEnd()));
                        continue block6;
                    }
                    case ROW_TIME_ATTRIBUTE: {
                        windowProperty.setField(i, (Object)this.getShiftEpochMills(window.getEnd() - 1L));
                        continue block6;
                    }
                    case PROC_TIME_ATTRIBUTE: {
                        windowProperty.setField(i, (Object)-1L);
                    }
                }
            }
            return windowProperty;
        };
    }

    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner(this.getRuntimeContext().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), this.userDefinedFunctionInputType, this.userDefinedFunctionOutputType, "flink:transform:stream_group_window_aggregate:v1", this.getUserDefinedFunctionsProto(), PythonTestUtils.createMockFlinkMetricContainer(), this.getKeyedStateBackend(), this.getKeySerializer(), this);
    }

    private void buildWindow(NamedWindowProperty[] namedProperties) {
        this.namedProperties = new FlinkFnApi.GroupWindow.WindowProperty[namedProperties.length];
        for (int i = 0; i < namedProperties.length; ++i) {
            WindowProperty namedProperty = namedProperties[i].getProperty();
            if (namedProperty instanceof WindowStart) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START;
                continue;
            }
            if (namedProperty instanceof WindowEnd) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END;
                continue;
            }
            if (namedProperty instanceof RowtimeAttribute) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.ROW_TIME_ATTRIBUTE;
                continue;
            }
            if (namedProperty instanceof ProctimeAttribute) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.PROC_TIME_ATTRIBUTE;
                continue;
            }
            throw new RuntimeException("Unexpected property " + namedProperty);
        }
    }

    public void processPythonElement(byte[] inputBytes) {
        try {
            RowData input = (RowData)this.udfInputTypeSerializer.deserialize((DataInputView)new DataInputDeserializer(inputBytes));
            if (input.getByte(0) == 0) {
                RowData inputRow = input.getRow(1, this.inputType.getFieldCount());
                BinaryRowData key = ((BinaryRowData)this.groupKeyProjection.apply(inputRow)).copy();
                Map curKeyWindowAccumulateData = this.windowAccumulateData.computeIfAbsent(key.getString(0).toString(), k -> new HashMap());
                Map curKeyWindowRetractData = this.windowRetractData.computeIfAbsent(key.getString(0).toString(), k -> new HashMap());
                long watermark = input.getLong(3);
                this.mockPythonInternalService.advanceWatermark(watermark);
                long timestamp = inputRow.getLong(this.inputTimeFieldIndex);
                timestamp = TimeWindowUtil.toUtcTimestampMills((long)timestamp, (ZoneId)this.shiftTimeZone);
                Collection elementWindows = this.windowAssigner.assignWindows(inputRow, timestamp);
                for (TimeWindow window : elementWindows) {
                    List currentWindowDatas;
                    if (RowDataUtil.isAccumulateMsg((RowData)inputRow)) {
                        currentWindowDatas = curKeyWindowAccumulateData.computeIfAbsent(window, k -> new LinkedList());
                        currentWindowDatas.add(inputRow);
                        continue;
                    }
                    currentWindowDatas = curKeyWindowRetractData.computeIfAbsent(window, k -> new LinkedList());
                    currentWindowDatas.add(inputRow);
                }
                ArrayList<TimeWindow> actualWindows = new ArrayList<TimeWindow>(elementWindows.size());
                for (TimeWindow window : elementWindows) {
                    if (this.isWindowLate(window)) continue;
                    actualWindows.add(window);
                }
                for (TimeWindow window : actualWindows) {
                    boolean triggerResult = this.onElement(key, window);
                    if (triggerResult) {
                        this.triggerWindowProcess((RowData)key, window);
                    }
                    this.registerCleanupTimer((RowData)key, window);
                }
            } else {
                RowData timerData = input.getRow(4, 3);
                long timestamp = input.getLong(2);
                RowData key = timerData.getRow(1, this.getKeyType().getFieldCount());
                byte[] encodedNamespace = timerData.getBinary(2);
                this.bais.setBuffer(encodedNamespace, 0, encodedNamespace.length);
                TimeWindow window = (TimeWindow)this.windowSerializer.deserialize((DataInputView)this.baisWrapper);
                if (timestamp == window.maxTimestamp()) {
                    this.triggerWindowProcess(key, window);
                }
                this.cleanWindowIfNeeded(key, window, timestamp);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void setResultBuffer(LinkedBlockingQueue<Tuple2<String, byte[]>> resultBuffer) {
        this.resultBuffer = resultBuffer;
    }

    private boolean isWindowLate(TimeWindow window) {
        return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.mockPythonInternalService.currentWatermark();
    }

    private long cleanupTime(TimeWindow window) {
        long windowMaxTs = TimeWindowUtil.toEpochMillsForTimer((long)window.maxTimestamp(), (ZoneId)this.shiftTimeZone);
        if (this.windowAssigner.isEventTime()) {
            long cleanupTime = Math.max(0L, windowMaxTs + this.allowedLateness);
            return cleanupTime >= windowMaxTs ? cleanupTime : Long.MAX_VALUE;
        }
        return Math.max(0L, windowMaxTs);
    }

    private boolean onElement(BinaryRowData key, TimeWindow window) throws IOException {
        if (window.maxTimestamp() <= this.mockPythonInternalService.currentWatermark()) {
            return true;
        }
        if (this.windowAssigner.isEventTime()) {
            this.registerEventTimeTimer((RowData)key, window);
        } else {
            this.registerProcessingTimeTimer((RowData)key, window);
        }
        return false;
    }

    private void triggerWindowProcess(RowData key, TimeWindow window) throws Exception {
        DataOutputSerializer output = new DataOutputSerializer(1);
        Iterable currentWindowAccumulateData = this.windowAccumulateData.get(key.getString(0).toString()).get(window);
        Iterable currentWindowRetractData = this.windowRetractData.get(key.getString(0).toString()).get(window);
        if (currentWindowAccumulateData != null) {
            for (RowData accumulateData : currentWindowAccumulateData) {
                if (this.hasRetractData(accumulateData, currentWindowRetractData)) continue;
                RowData aggResult = this.aggExtracter.apply(accumulateData);
                RowData windowProperty = this.windowExtractor.apply(window);
                this.windowAggResult.replace(key, aggResult);
                this.reuseJoinedRow.replace((RowData)this.windowAggResult, windowProperty);
                this.reusePythonRowData.setField(1, (Object)this.reuseJoinedRow);
                this.udfOutputTypeSerializer.serialize((Object)this.reusePythonRowData, (DataOutputView)output);
                this.resultBuffer.add((Tuple2<String, byte[]>)Tuple2.of((Object)"", (Object)output.getCopyOfBuffer()));
                break;
            }
        }
    }

    private boolean hasRetractData(RowData accumulateData, Iterable<RowData> currentWindowRetractData) {
        if (currentWindowRetractData != null) {
            for (RowData retractData : currentWindowRetractData) {
                if (retractData.getRowKind() == RowKind.UPDATE_BEFORE) {
                    retractData.setRowKind(RowKind.UPDATE_AFTER);
                } else {
                    retractData.setRowKind(RowKind.INSERT);
                }
                if (!accumulateData.equals(retractData)) continue;
                return true;
            }
        }
        return false;
    }

    private Projection<RowData, BinaryRowData> createProjection(String name, int[] fields) {
        RowType forwardedFieldType = new RowType(Arrays.stream(fields).mapToObj(i -> (RowType.RowField)this.inputType.getFields().get(i)).collect(Collectors.toList()));
        GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader()), (String)name, (RowType)this.inputType, (RowType)forwardedFieldType, (int[])fields);
        return (Projection)generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
    }

    private void registerCleanupTimer(RowData key, TimeWindow window) throws IOException {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.registerEventTimeTimer(key, window);
        } else {
            this.registerProcessingTimeTimer(key, window);
        }
    }

    private void registerEventTimeTimer(RowData key, TimeWindow window) throws IOException {
        this.emitTimerData(key, window, (byte)0);
    }

    private void deleteEventTimeTimer(RowData key, TimeWindow window) throws IOException {
        this.emitTimerData(key, window, (byte)2);
    }

    private void registerProcessingTimeTimer(RowData key, TimeWindow window) throws IOException {
        this.emitTimerData(key, window, (byte)1);
    }

    private void deleteProcessingTimeTimer(RowData key, TimeWindow window) throws IOException {
        this.emitTimerData(key, window, (byte)3);
    }

    private void emitTimerData(RowData key, TimeWindow window, byte timerOperand) throws IOException {
        this.reusePythonTimerData.setByte(0, timerOperand);
        this.reusePythonTimerData.setField(1, (Object)key);
        this.reusePythonTimerData.setLong(2, window.maxTimestamp());
        this.windowSerializer.serialize((Object)window, (DataOutputView)this.windowBaosWrapper);
        this.reusePythonTimerData.setField(3, (Object)this.windowBaos.toByteArray());
        this.windowBaos.reset();
        DataOutputSerializer output = new DataOutputSerializer(1);
        this.udfOutputTypeSerializer.serialize((Object)this.reusePythonTimerRowData, (DataOutputView)output);
        this.resultBuffer.add((Tuple2<String, byte[]>)Tuple2.of((Object)"", (Object)output.getCopyOfBuffer()));
    }

    private void cleanWindowIfNeeded(RowData key, TimeWindow window, long currentTime) throws IOException {
        if (currentTime == this.cleanupTime(window)) {
            DataOutputSerializer output = new DataOutputSerializer(1);
            RowData windowProperty = this.windowExtractor.apply(window);
            this.windowAggResult.replace((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)("state_cleanup_triggered: " + key.getString(0).toString() + " : " + window))}), (RowData)GenericRowData.of((Object[])new Object[]{0L}));
            this.reuseJoinedRow.replace((RowData)this.windowAggResult, windowProperty);
            this.reusePythonRowData.setField(1, (Object)this.reuseJoinedRow);
            this.udfOutputTypeSerializer.serialize((Object)this.reusePythonRowData, (DataOutputView)output);
            this.resultBuffer.add((Tuple2<String, byte[]>)Tuple2.of((Object)"", (Object)output.getCopyOfBuffer()));
            if (this.windowAssigner.isEventTime()) {
                this.deleteEventTimeTimer(key, window);
            } else {
                this.deleteProcessingTimeTimer(key, window);
            }
        }
    }

    private static class MockPythonWindowOperator<K>
    implements Triggerable<K, TimeWindow> {
        MockPythonWindowOperator() {
        }

        public void onEventTime(InternalTimer<K, TimeWindow> timer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<K, TimeWindow> timer) throws Exception {
        }
    }
}

