package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TableFactoryHarness;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.types.Row;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.class */
public class CommonExecSinkITCase extends AbstractTestBase {
    private StreamExecutionEnvironment env;

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$RecordWriter.class */
    public static class RecordWriter extends TestSink.DefaultSinkWriter<RowData> {
        private final SharedReference<List<RowData>> rows;

        private RecordWriter(SharedReference<List<RowData>> sharedReference) {
            this.rows = sharedReference;
        }

        public void write(RowData rowData, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.rows, rowData);
            super.write(rowData, context);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$TestSource.class */
    private static class TestSource implements SourceFunction<RowData> {
        private final List<Row> rows;
        private final DynamicTableSource.DataStructureConverter converter;

        public TestSource(List<Row> list, DynamicTableSource.DataStructureConverter dataStructureConverter) {
            this.rows = list;
            this.converter = dataStructureConverter;
        }

        public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
            Stream<R> map = this.rows.stream().map(row -> {
                return (RowData) this.converter.toInternal(row);
            });
            sourceContext.getClass();
            map.forEach((v1) -> {
                r1.collect(v1);
            });
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$TestTimestampWriter.class */
    private static class TestTimestampWriter extends TestSink.DefaultSinkWriter<RowData> {
        private final SharedReference<List<Long>> timestamps;

        private TestTimestampWriter(SharedReference<List<Long>> sharedReference) {
            this.timestamps = sharedReference;
        }

        public void write(RowData rowData, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.timestamps, context.timestamp());
            super.write(rowData, context);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$TimestampTestSource.class */
    private static class TimestampTestSource extends TableFactoryHarness.ScanSourceBase {
        private final List<Row> rows;

        private TimestampTestSource(List<Row> list) {
            super(false);
            this.rows = list;
        }

        @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.ScanSourceBase
        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
            return SourceFunctionProvider.of(new TestSource(this.rows, scanContext.createDataStructureConverter(getFactoryContext().getCatalogTable().getResolvedSchema().toPhysicalRowDataType())), true);
        }
    }

    @Before
    public void before() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(4);
    }

    @Test
    public void testStreamRecordTimestampInserterSinkRuntimeProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        List asList = Arrays.asList(Row.of(new Object[]{1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")}), Row.of(new Object[]{2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")}), Row.of(new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of(new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        create.createTable("T1", TableFactoryHarness.newBuilder().schema(schemaStreamRecordTimestampInserter(true)).source(new TimestampTestSource(asList)).sink(new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.1
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider */
            public DynamicTableSink.SinkRuntimeProvider mo1061getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return SinkProvider.of(TestSink.newBuilder().setWriter(new TestTimestampWriter(add)).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build());
            }
        }).build());
        assertPlan(create, "INSERT INTO T1 SELECT * FROM T1", true);
        create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        assertTimestampResults(add, asList);
    }

    @Test
    public void testStreamRecordTimestampInserterDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        List asList = Arrays.asList(Row.of(new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of(new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of(new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of(new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        create.createTable("T1", TableFactoryHarness.newBuilder().schema(schemaStreamRecordTimestampInserter(true)).source(new TimestampTestSource(asList)).sink(new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.2
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider, reason: merged with bridge method [inline-methods] */
            public DataStreamSinkProvider mo1061getSinkRuntimeProvider(DynamicTableSink.Context context) {
                SharedReference sharedReference = add;
                return dataStream -> {
                    return dataStream.addSink(new SinkFunction<RowData>() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.2.1
                        public void invoke(RowData rowData, SinkFunction.Context context2) {
                            CommonExecSinkITCase.addElement(sharedReference, context2.timestamp());
                        }
                    });
                };
            }
        }).build());
        assertPlan(create, "INSERT INTO T1 SELECT * FROM T1", true);
        create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        Collections.sort((List) add.get());
        assertTimestampResults(add, asList);
    }

    @Test
    public void testUnifiedSinksAreUsableWithDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        create.createTable("T1", TableFactoryHarness.newBuilder().schema(Schema.newBuilder().column("a", DataTypes.INT()).build()).source(new TimestampTestSource(Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2})))).sink(new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.3
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider, reason: merged with bridge method [inline-methods] */
            public DataStreamSinkProvider mo1061getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return CommonExecSinkITCase.buildRecordTestSinkProvider(add);
            }
        }).build());
        create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        List list = (List) ((List) add.get()).stream().map(rowData -> {
            return Integer.valueOf(rowData.getInt(0));
        }).sorted().collect(Collectors.toList());
        Assert.assertEquals(((Integer) list.get(0)).intValue(), 1L);
        Assert.assertEquals(((Integer) list.get(1)).intValue(), 2L);
    }

    @Test
    public void testStreamRecordTimestampInserterNotApplied() {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        create.createTable("T1", TableFactoryHarness.newBuilder().schema(schemaStreamRecordTimestampInserter(false)).source(new TimestampTestSource(Arrays.asList(Row.of(new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of(new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of(new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of(new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")})))).sink(new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.4
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider */
            public DynamicTableSink.SinkRuntimeProvider mo1061getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return SinkProvider.of(TestSink.newBuilder().setWriter(new TestTimestampWriter(add)).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build());
            }
        }).build());
        assertPlan(create, "INSERT INTO T1 SELECT * FROM T1", false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DataStreamSinkProvider buildRecordTestSinkProvider(SharedReference<List<RowData>> sharedReference) {
        return dataStream -> {
            return dataStream.sinkTo(TestSink.newBuilder().setWriter(new RecordWriter(sharedReference)).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build());
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void addElement(SharedReference<List<T>> sharedReference, T t) {
        sharedReference.applySync(list -> {
            return Boolean.valueOf(list.add(t));
        });
    }

    private static void assertPlan(StreamTableEnvironment streamTableEnvironment, String str, boolean z) {
        Matcher containsString = Matchers.containsString("StreamRecordTimestampInserter(rowtime field: 2");
        if (!z) {
            containsString = Matchers.not(containsString);
        }
        MatcherAssert.assertThat(streamTableEnvironment.explainSql(str, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN}), containsString);
    }

    private static Schema schemaStreamRecordTimestampInserter(boolean z) {
        Schema.Builder column = Schema.newBuilder().column("a", "INT").column("b", "STRING").column("ts", "TIMESTAMP_LTZ(3)");
        if (z) {
            column.watermark("ts", "ts");
        }
        return column.build();
    }

    private static void assertTimestampResults(SharedReference<List<Long>> sharedReference, List<Row> list) {
        Assert.assertEquals(list.size(), ((List) sharedReference.get()).size());
        for (int i = 0; i < list.size(); i++) {
            Assert.assertEquals(list.get(i).getField(2), Instant.ofEpochMilli(((Long) ((List) sharedReference.get()).get(i)).longValue()));
        }
    }
}
