/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.aggregate;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowReference;
import org.apache.flink.table.runtime.groupwindow.WindowStart;
import org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest;
import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;

class PythonStreamGroupWindowAggregateOperatorTest
extends AbstractPythonStreamAggregateOperatorTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");

    PythonStreamGroupWindowAggregateOperatorTest() {
    }

    @Test
    void testGroupWindowAggregateFunction() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(new Configuration());
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(this.newRecord(true, initialTime + 1L, "c1", "c2", 0L, 0L));
        testHarness.processElement(this.newRecord(true, initialTime + 2L, "c1", "c4", 1L, 6000L));
        testHarness.processElement(this.newRecord(true, initialTime + 3L, "c1", "c6", 2L, 10000L));
        testHarness.processElement(this.newRecord(true, initialTime + 4L, "c2", "c8", 3L, 0L));
        testHarness.processElement(this.newRecord(true, initialTime + 5L, "c3", "c8", 3L, 0L));
        testHarness.processElement(this.newRecord(false, initialTime + 6L, "c3", "c8", 3L, 0L));
        testHarness.processWatermark(Long.MAX_VALUE);
        testHarness.close();
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c1"));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c3"));
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c1"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c2"));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c3"));
        expectedOutput.add(this.newWindowRecord(5000L, 15000L, "c1", 1L));
        expectedOutput.add(this.newStateCleanupRecord(5000L, 15000L, "c1"));
        expectedOutput.add(this.newWindowRecord(10000L, 20000L, "c1", 2L));
        expectedOutput.add(this.newStateCleanupRecord(10000L, 20000L, "c1"));
        expectedOutput.add(new Watermark(Long.MAX_VALUE));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(this.newRecord(true, initialTime + 1L, "c1", "c2", 0L, 0L));
        testHarness.processElement(this.newRecord(true, initialTime + 2L, "c1", "c4", 1L, 6000L));
        testHarness.processElement(this.newRecord(true, initialTime + 3L, "c1", "c6", 2L, 10000L));
        testHarness.processElement(this.newRecord(true, initialTime + 4L, "c2", "c8", 3L, 0L));
        testHarness.processWatermark(new Watermark(10000L));
        testHarness.prepareSnapshotPreBarrier(0L);
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c1"));
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c1"));
        expectedOutput.add(new Watermark(10000L));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(20000L);
        testHarness.close();
        expectedOutput.add(this.newWindowRecord(5000L, 15000L, "c1", 1L));
        expectedOutput.add(this.newStateCleanupRecord(5000L, 15000L, "c1"));
        expectedOutput.add(this.newWindowRecord(10000L, 20000L, "c1", 2L));
        expectedOutput.add(this.newStateCleanupRecord(10000L, 20000L, "c1"));
        expectedOutput.add(new Watermark(20000L));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 4);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(this.newRecord(true, initialTime + 1L, "c1", "c2", 0L, 0L));
        testHarness.processElement(this.newRecord(true, initialTime + 2L, "c1", "c4", 1L, 6000L));
        testHarness.processElement(this.newRecord(true, initialTime + 3L, "c1", "c6", 2L, 10000L));
        testHarness.processElement(this.newRecord(true, initialTime + 4L, "c2", "c8", 3L, 0L));
        testHarness.processWatermark(new Watermark(10000L));
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c1"));
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c1"));
        expectedOutput.add(new Watermark(10000L));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(20000L);
        testHarness.close();
        expectedOutput.add(this.newWindowRecord(5000L, 15000L, "c1", 1L));
        expectedOutput.add(this.newStateCleanupRecord(5000L, 15000L, "c1"));
        expectedOutput.add(this.newWindowRecord(10000L, 20000L, "c1", 2L));
        expectedOutput.add(this.newStateCleanupRecord(10000L, 20000L, "c1"));
        expectedOutput.add(new Watermark(20000L));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredByTime() throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(this.newRecord(true, initialTime + 1L, "c1", "c2", 0L, 0L));
        testHarness.processElement(this.newRecord(true, initialTime + 2L, "c1", "c4", 1L, 6000L));
        testHarness.processElement(this.newRecord(true, initialTime + 3L, "c1", "c6", 2L, 10000L));
        testHarness.processElement(this.newRecord(true, initialTime + 4L, "c2", "c8", 3L, 0L));
        testHarness.processWatermark(new Watermark(20000L));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c1"));
        expectedOutput.add(this.newWindowRecord(-5000L, 5000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(-5000L, 5000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c2", 3L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c2"));
        expectedOutput.add(this.newWindowRecord(0L, 10000L, "c1", 0L));
        expectedOutput.add(this.newStateCleanupRecord(0L, 10000L, "c1"));
        expectedOutput.add(this.newWindowRecord(5000L, 15000L, "c1", 1L));
        expectedOutput.add(this.newStateCleanupRecord(5000L, 15000L, "c1"));
        expectedOutput.add(this.newWindowRecord(10000L, 20000L, "c1", 2L));
        expectedOutput.add(this.newStateCleanupRecord(10000L, 20000L, "c1"));
        expectedOutput.add(new Watermark(20000L));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private StreamRecord<RowData> newRecord(boolean accumulateMsg, long timestamp, Object ... fields) {
        return new StreamRecord((Object)this.newRow(accumulateMsg, fields), timestamp);
    }

    private StreamRecord<RowData> newWindowRecord(long start, long end, Object ... fields) {
        Object[] rowFields = new Object[fields.length + 2];
        System.arraycopy(fields, 0, rowFields, 0, fields.length);
        rowFields[rowFields.length - 2] = TimestampData.fromEpochMillis((long)start);
        rowFields[rowFields.length - 1] = TimestampData.fromEpochMillis((long)end);
        return new StreamRecord((Object)this.newRow(true, rowFields));
    }

    private StreamRecord newStateCleanupRecord(long start, long end, Object key) {
        String field = String.format("state_cleanup_triggered: %s : TimeWindow{start=%s, end=%s}", key, start, end);
        return new StreamRecord((Object)this.newRow(true, field, 0L, TimestampData.fromEpochMillis((long)start), TimestampData.fromEpochMillis((long)end)));
    }

    @Override
    public LogicalType[] getOutputLogicalType() {
        return new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType()};
    }

    @Override
    public RowType getInputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new VarCharType()), new RowType.RowField("f3", (LogicalType)new BigIntType()), new RowType.RowField("rowTime", (LogicalType)new BigIntType())));
    }

    @Override
    public RowType getOutputType() {
        return new RowType(Arrays.asList(new RowType.RowField("f1", (LogicalType)new VarCharType()), new RowType.RowField("f2", (LogicalType)new BigIntType()), new RowType.RowField("windowStart", (LogicalType)new TimestampType(3)), new RowType.RowField("windowEnd", (LogicalType)new TimestampType(3))));
    }

    @Override
    OneInputStreamOperator getTestOperator(Configuration config) {
        long size = 10000L;
        long slide = 5000L;
        SlidingWindowAssigner windowAssigner = SlidingWindowAssigner.of((Duration)Duration.ofMillis(size), (Duration)Duration.ofMillis(slide)).withEventTime();
        WindowReference windowRef = new WindowReference("w$", (LogicalType)new TimestampType(3));
        return new PassThroughPythonStreamGroupWindowAggregateOperator(config, this.getInputType(), this.getOutputType(), new PythonAggregateFunctionInfo[]{new PythonAggregateFunctionInfo(PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE, (Object[])new Integer[]{2}, -1, false)}, this.getGrouping(), -1, false, false, 3, (WindowAssigner<TimeWindow>)windowAssigner, FlinkFnApi.GroupWindow.WindowType.SLIDING_GROUP_WINDOW, true, true, size, slide, 0L, 0L, new NamedWindowProperty[]{new NamedWindowProperty("start", (WindowProperty)new WindowStart(null)), new NamedWindowProperty("end", (WindowProperty)new WindowEnd(null))}, UTC_ZONE_ID);
    }
}

