package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.class */
public class RowTimeRangeBoundedPrecedingFunctionTest extends RowTimeOverWindowTestBase {
    @Test
    public void testStateCleanup() throws Exception {
        KeyedProcessOperator<RowData, RowData, RowData> keyedProcessOperator = new KeyedProcessOperator<>(new RowTimeRangeBoundedPrecedingFunction(aggsHandleFunction, this.accTypes, this.inputFieldTypes, 2000L, 2));
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(keyedProcessOperator);
        createTestHarness.open();
        AbstractKeyedStateBackend keyedStateBackend = keyedProcessOperator.getKeyedStateBackend();
        Assert.assertEquals("Initial state is not empty", 0L, keyedStateBackend.numKeyValueStateEntries());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 500L));
        createTestHarness.processWatermark(new Watermark(1000L));
        createTestHarness.processWatermark(new Watermark(4000L));
        Assert.assertEquals("State has not been cleaned up", 0L, keyedStateBackend.numKeyValueStateEntries());
    }

    @Test
    public void testLateRecordMetrics() throws Exception {
        RowTimeRangeBoundedPrecedingFunction rowTimeRangeBoundedPrecedingFunction = new RowTimeRangeBoundedPrecedingFunction(aggsHandleFunction, this.accTypes, this.inputFieldTypes, 2000L, 2);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(new KeyedProcessOperator<>(rowTimeRangeBoundedPrecedingFunction));
        createTestHarness.open();
        Counter counter = rowTimeRangeBoundedPrecedingFunction.getCounter();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 500L));
        createTestHarness.processWatermark(new Watermark(500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 400L));
        Assert.assertEquals(1L, counter.getCount());
    }
}
