package org.apache.flink.table.runtime.operators.deduplicate.window;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.class */
public class RowTimeWindowDeduplicateOperatorTest {
    private static final int WINDOW_END_INDEX = 2;
    private final ZoneId shiftTimeZone;
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", new BigIntType()), new RowType.RowField("f2", new BigIntType())));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, (LogicalType[]) INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
    private static final PagedTypeSerializer<RowData> KEY_SER = KEY_SELECTOR.getProducedType().toSerializer();
    private static final LogicalType[] OUTPUT_TYPES = {new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType()};
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");

    public RowTimeWindowDeduplicateOperatorTest(ZoneId zoneId) {
        this.shiftTimeZone = zoneId;
    }

    @Parameterized.Parameters(name = "TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList(new Object[]{UTC_ZONE_ID}, new Object[]{SHANGHAI_ZONE_ID});
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(SlicingWindowOperator<RowData, ?> slicingWindowOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(slicingWindowOperator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
    }

    @Test
    public void testRowTimeWindowDeduplicateKeepFirstRow() throws Exception {
        SlicingWindowOperator build = RowTimeWindowDeduplicateOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).keepLastRow(false).rowtimeIndex(1).windowEndIndex(WINDOW_END_INDEX).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 4L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 5L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1002L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3007L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3008L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3001L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1004L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1006L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1007L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 1L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 1004L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1002L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup(OUT_SERIALIZER);
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3001L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 3001L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3500L, this.shiftTimeZone))));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        Assertions.assertThat(build.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        createTestHarness2.close();
    }

    @Test
    public void testRowTimeWindowDeduplicateKeepLastRow() throws Exception {
        SlicingWindowOperator build = RowTimeWindowDeduplicateOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).keepLastRow(true).rowtimeIndex(1).windowEndIndex(WINDOW_END_INDEX).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 4L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 5L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1002L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3007L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3008L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3001L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1004L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1006L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1007L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 5L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 1007L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1002L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup(OUT_SERIALIZER);
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3008L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3999L, this.shiftTimeZone))));
        concurrentLinkedQueue.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 3001L, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3500L, this.shiftTimeZone))));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        Assertions.assertThat(build.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        createTestHarness2.close();
    }
}
