package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Either;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.class */
public class DataStreamJavaITCase extends AbstractTestBase {
    private StreamExecutionEnvironment env;

    @Parameterized.Parameter
    public ObjectReuse objectReuse;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.table.planner.runtime.stream.sql.DataStreamJavaITCase$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase$ComplexPojo.class */
    public static class ComplexPojo {
        public int c;
        public String a;
        public ImmutablePojo p;

        static ComplexPojo of(int i, String str, ImmutablePojo immutablePojo) {
            ComplexPojo complexPojo = new ComplexPojo();
            complexPojo.c = i;
            complexPojo.a = str;
            complexPojo.p = immutablePojo;
            return complexPojo;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ComplexPojo complexPojo = (ComplexPojo) obj;
            return this.c == complexPojo.c && Objects.equals(this.a, complexPojo.a) && Objects.equals(this.p, complexPojo.p);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.c), this.a, this.p);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase$ImmutablePojo.class */
    public static class ImmutablePojo {
        private final Boolean b;
        private final Double d;

        public ImmutablePojo(Double d, Boolean bool) {
            this.d = d;
            this.b = bool;
        }

        public Boolean getB() {
            return this.b;
        }

        public Double getD() {
            return this.d;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ImmutablePojo immutablePojo = (ImmutablePojo) obj;
            return Objects.equals(this.b, immutablePojo.b) && Objects.equals(this.d, immutablePojo.d);
        }

        public int hashCode() {
            return Objects.hash(this.b, this.d);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase$ObjectReuse.class */
    enum ObjectReuse {
        ENABLED,
        DISABLED
    }

    @Parameterized.Parameters(name = "objectReuse = {0}")
    public static ObjectReuse[] objectReuse() {
        return ObjectReuse.values();
    }

    @Before
    public void before() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        this.env.setParallelism(4);
        if (this.objectReuse == ObjectReuse.ENABLED) {
            this.env.getConfig().enableObjectReuse();
        } else if (this.objectReuse == ObjectReuse.DISABLED) {
            this.env.getConfig().disableObjectReuse();
        }
    }

    @Test
    public void testFromDataStreamAtomic() {
        TableResult execute = StreamTableEnvironment.create(this.env).fromDataStream(this.env.fromElements(new Integer[]{1, 2, 3, 4, 5})).execute();
        testSchema(execute, Column.physical("f0", DataTypes.INT().notNull()));
        testResult(execute, Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3}), Row.of(new Object[]{4}), Row.of(new Object[]{5}));
    }

    @Test
    public void testToDataStreamAtomic() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        testResult(create.toDataStream(create.fromValues(new Object[]{1, 2, 3, 4, 5}), Integer.class), 1, 2, 3, 4, 5);
    }

    @Test
    public void testFromDataStreamWithRow() {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        TypeInformation ROW_NAMED = Types.ROW_NAMED(new String[]{"b", "c", "a"}, new TypeInformation[]{Types.INT, Types.ROW(new TypeInformation[]{Types.BOOLEAN, Types.STRING}), Types.MAP(Types.STRING, Types.DOUBLE)});
        Row[] rowArr = {Row.of(new Object[]{12, Row.of(new Object[]{false, "hello"}), Collections.singletonMap("world", Double.valueOf(2.0d))}), Row.of(new Object[]{null, Row.of(new Object[]{false, null}), Collections.singletonMap("world", null)})};
        TableResult execute = create.fromDataStream(this.env.fromCollection(Arrays.asList(rowArr), ROW_NAMED)).execute();
        testSchema(execute, Column.physical("b", DataTypes.INT()), Column.physical("c", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f0", DataTypes.BOOLEAN()), DataTypes.FIELD("f1", DataTypes.STRING())})), Column.physical("a", DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE())));
        testResult(execute, rowArr);
    }

    @Test
    public void testToDataStreamWithRow() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        Row[] rowArr = {Row.of(new Object[]{12, Row.of(new Object[]{false, "hello"}), Collections.singletonMap("world", Double.valueOf(2.0d))}), Row.of(new Object[]{null, Row.of(new Object[]{false, null}), Collections.singletonMap("world", Double.valueOf(1.0d))})};
        testResult(create.toDataStream(create.fromValues(rowArr)), rowArr);
    }

    @Test
    public void testFromAndToDataStreamWithPojo() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        ComplexPojo[] complexPojoArr = {ComplexPojo.of(42, "hello", new ImmutablePojo(Double.valueOf(42.0d), null)), ComplexPojo.of(42, null, null)};
        Table fromDataStream = create.fromDataStream(this.env.fromElements(complexPojoArr), Schema.newBuilder().column("c", DataTypes.INT()).column("a", DataTypes.STRING()).column("p", DataTypes.of(ImmutablePojo.class)).build());
        testSchema(fromDataStream, Column.physical("c", DataTypes.INT()), Column.physical("a", DataTypes.STRING()), Column.physical("p", DataTypes.STRUCTURED(ImmutablePojo.class, new DataTypes.Field[]{DataTypes.FIELD("d", DataTypes.DOUBLE()), DataTypes.FIELD("b", DataTypes.BOOLEAN())})));
        create.createTemporaryView("t", fromDataStream);
        testResult(create.executeSql("SELECT p, p.d, p.b FROM t"), Row.of(new Object[]{new ImmutablePojo(Double.valueOf(42.0d), null), Double.valueOf(42.0d), null}), Row.of(new Object[]{null, null, null}));
        testResult(create.toDataStream(fromDataStream, ComplexPojo.class), complexPojoArr);
    }

    @Test
    public void testFromAndToDataStreamWithRaw() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(Tuple2.of(DayOfWeek.MONDAY, ZoneOffset.UTC), Tuple2.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
        DataStreamSource fromCollection = this.env.fromCollection(asList);
        MatcherAssert.assertThat(fromCollection.getType(), Matchers.instanceOf(TupleTypeInfo.class));
        TupleTypeInfo type = fromCollection.getType();
        MatcherAssert.assertThat(type.getFieldTypes()[0], Matchers.instanceOf(EnumTypeInfo.class));
        MatcherAssert.assertThat(type.getFieldTypes()[1], Matchers.instanceOf(GenericTypeInfo.class));
        Table fromDataStream = create.fromDataStream(fromCollection);
        List columnDataTypes = fromDataStream.getResolvedSchema().getColumnDataTypes();
        MatcherAssert.assertThat(((DataType) columnDataTypes.get(0)).getLogicalType(), Matchers.instanceOf(RawType.class));
        MatcherAssert.assertThat(((DataType) columnDataTypes.get(1)).getLogicalType(), Matchers.instanceOf(RawType.class));
        testResult(fromDataStream.execute(), Row.of(new Object[]{DayOfWeek.MONDAY, ZoneOffset.UTC}), Row.of(new Object[]{DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)}));
        testResult(create.toDataStream(fromDataStream, DataTypes.of(fromCollection.getType())), asList.toArray(new Tuple2[0]));
    }

    @Test
    public void testFromAndToDataStreamEventTime() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        Table fromDataStream = create.fromDataStream(getWatermarkedDataStream(), Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").watermark("rowtime", "SOURCE_WATERMARK()").build());
        testSchema(fromDataStream, new ResolvedSchema(Arrays.asList(Column.physical("f0", DataTypes.BIGINT().notNull()), Column.physical("f1", DataTypes.INT().notNull()), Column.physical("f2", DataTypes.STRING()), Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), (String) null, false)), Collections.singletonList(WatermarkSpec.of("rowtime", ResolvedExpressionMock.of(DataTypes.TIMESTAMP_LTZ(3), "`SOURCE_WATERMARK`()"))), (UniqueConstraint) null));
        create.createTemporaryView("t", fromDataStream);
        testResult(create.executeSql("SELECT f2, SUM(f1) FROM t GROUP BY f2, TUMBLE(rowtime, INTERVAL '0.005' SECOND)"), Row.of(new Object[]{"a", 47}), Row.of(new Object[]{"c", 1000}), Row.of(new Object[]{"c", 1000}));
        testResult((DataStream) create.toDataStream(fromDataStream).keyBy(row -> {
            return row.getField("f2");
        }).window(TumblingEventTimeWindows.of(Time.milliseconds(5L))).apply((obj, timeWindow, iterable, collector) -> {
            int i = 0;
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) ((Row) it.next()).getFieldAs("f1")).intValue();
            }
            collector.collect(Row.of(new Object[]{obj, Integer.valueOf(i)}));
        }).returns(Types.ROW(new TypeInformation[]{Types.STRING, Types.INT})), (Object[]) new Row[]{Row.of(new Object[]{"a", 47}), Row.of(new Object[]{"c", 1000}), Row.of(new Object[]{"c", 1000})});
    }

    @Test
    public void testFromAndToChangelogStreamEventTime() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        create.createTemporaryView("t", create.fromChangelogStream(getWatermarkedDataStream().map(tuple3 -> {
            return Row.ofKind(RowKind.INSERT, new Object[]{tuple3.f1, tuple3.f2});
        }).returns(Types.ROW(new TypeInformation[]{Types.INT, Types.STRING})), Schema.newBuilder().columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)).columnByExpression("computed", (Expression) Expressions.$("f1").upperCase()).watermark("rowtime", Expressions.sourceWatermark()).build()));
        testResult((DataStream) create.toChangelogStream(create.sqlQuery("SELECT computed, rowtime, f0 FROM t"), Schema.newBuilder().column("f1", DataTypes.STRING()).columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)).columnByExpression("ignored", (Expression) Expressions.$("f1").upperCase()).column("f0", DataTypes.INT()).build()).keyBy(row -> {
            return row.getField("f1");
        }).window(TumblingEventTimeWindows.of(Time.milliseconds(5L))).apply((obj, timeWindow, iterable, collector) -> {
            int i = 0;
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) ((Row) it.next()).getFieldAs("f0")).intValue();
            }
            collector.collect(Row.of(new Object[]{obj, Integer.valueOf(i)}));
        }).returns(Types.ROW(new TypeInformation[]{Types.STRING, Types.INT})), (Object[]) new Row[]{Row.of(new Object[]{"A", 47}), Row.of(new Object[]{"C", 1000}), Row.of(new Object[]{"C", 1000})});
    }

    @Test
    public void testFromAndToChangelogStreamRetract() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(input(RowKind.INSERT, "bob", 0), output(RowKind.INSERT, "bob", 0), input(RowKind.UPDATE_BEFORE, "bob", 0), output(RowKind.DELETE, "bob", 0), input(RowKind.UPDATE_AFTER, "bob", 1), output(RowKind.INSERT, "bob", 1), input(RowKind.INSERT, "alice", 1), output(RowKind.INSERT, "alice", 1), input(RowKind.INSERT, "alice", 1), output(RowKind.UPDATE_BEFORE, "alice", 1), output(RowKind.UPDATE_AFTER, "alice", 2), input(RowKind.UPDATE_BEFORE, "alice", 1), output(RowKind.UPDATE_BEFORE, "alice", 2), output(RowKind.UPDATE_AFTER, "alice", 1), input(RowKind.UPDATE_AFTER, "alice", 2), output(RowKind.UPDATE_BEFORE, "alice", 1), output(RowKind.UPDATE_AFTER, "alice", 3), input(RowKind.UPDATE_BEFORE, "alice", 2), output(RowKind.UPDATE_BEFORE, "alice", 3), output(RowKind.UPDATE_AFTER, "alice", 1), input(RowKind.UPDATE_AFTER, "alice", 100), output(RowKind.UPDATE_BEFORE, "alice", 1), output(RowKind.UPDATE_AFTER, "alice", 101));
        create.createTemporaryView("t", create.fromChangelogStream(this.env.fromElements(getInput(asList))));
        Table sqlQuery = create.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0");
        testResult(sqlQuery.execute(), getOutput(asList));
        testResult(create.toChangelogStream(sqlQuery), getOutput(asList));
    }

    @Test
    public void testFromChangelogStreamUpsert() {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(input(RowKind.INSERT, "bob", 0), output(RowKind.INSERT, "bob", 0), input(RowKind.UPDATE_AFTER, "bob", 1), output(RowKind.UPDATE_BEFORE, "bob", 0), output(RowKind.UPDATE_AFTER, "bob", 1), input(RowKind.INSERT, "alice", 1), output(RowKind.INSERT, "alice", 1), input(RowKind.INSERT, "alice", 1), input(RowKind.UPDATE_AFTER, "alice", 2), output(RowKind.UPDATE_BEFORE, "alice", 1), output(RowKind.UPDATE_AFTER, "alice", 2), input(RowKind.UPDATE_AFTER, "alice", 100), output(RowKind.UPDATE_BEFORE, "alice", 2), output(RowKind.UPDATE_AFTER, "alice", 100));
        create.createTemporaryView("t", create.fromChangelogStream(this.env.fromElements(getInput(asList)), Schema.newBuilder().primaryKey(new String[]{"f0"}).build(), ChangelogMode.upsert()));
        testResult(create.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0").execute(), getOutput(asList));
    }

    @Test
    public void testFromAndToChangelogStreamUpsert() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(input(RowKind.INSERT, "bob", 0), output(RowKind.INSERT, "bob", 0), input(RowKind.UPDATE_AFTER, "bob", 1), output(RowKind.UPDATE_AFTER, "bob", 1), input(RowKind.INSERT, "alice", 1), output(RowKind.INSERT, "alice", 1), input(RowKind.INSERT, "alice", 1), input(RowKind.UPDATE_AFTER, "alice", 2), output(RowKind.UPDATE_AFTER, "alice", 2), input(RowKind.UPDATE_AFTER, "alice", 100), output(RowKind.UPDATE_AFTER, "alice", 100));
        create.createTemporaryView("t", create.fromChangelogStream(this.env.fromElements(getInput(asList)), Schema.newBuilder().primaryKey(new String[]{"f0"}).build(), ChangelogMode.upsert()));
        testResult(create.toChangelogStream(create.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0"), Schema.newBuilder().primaryKey(new String[]{"f0"}).build(), ChangelogMode.upsert()), getOutput(asList));
    }

    @Test
    public void testToDataStreamCustomEventTime() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        TableConfig config = create.getConfig();
        ZoneId localTimeZone = config.getLocalTimeZone();
        config.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
        LocalDateTime parse = LocalDateTime.parse("1970-01-01T00:00:00.000");
        LocalDateTime parse2 = LocalDateTime.parse("1970-01-01T01:00:00.000");
        Table fromDataStream = create.fromDataStream(this.env.fromElements(new Tuple2[]{new Tuple2(parse, "alice"), new Tuple2(parse2, "bob")}), Schema.newBuilder().column("f0", "TIMESTAMP(3)").column("f1", "STRING").watermark("f0", "SOURCE_WATERMARK()").build());
        testSchema(fromDataStream, new ResolvedSchema(Arrays.asList(Column.physical("f0", DataTypes.TIMESTAMP(3)), Column.physical("f1", DataTypes.STRING())), Collections.singletonList(WatermarkSpec.of("f0", ResolvedExpressionMock.of(DataTypes.TIMESTAMP(3), "`SOURCE_WATERMARK`()"))), (UniqueConstraint) null));
        testResult((DataStream) create.toDataStream(fromDataStream).process(new ProcessFunction<Row, Long>() { // from class: org.apache.flink.table.planner.runtime.stream.sql.DataStreamJavaITCase.1
            public void processElement(Row row, ProcessFunction<Row, Long>.Context context, Collector<Long> collector) {
                collector.collect(context.timestamp());
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Row) obj, (ProcessFunction<Row, Long>.Context) context, (Collector<Long>) collector);
            }
        }), (Object[]) new Long[]{Long.valueOf(parse.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli()), Long.valueOf(parse2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli())});
        config.setLocalTimeZone(localTimeZone);
    }

    @Test
    public void testComplexUnifiedPipelineBatch() throws Exception {
        this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        testResult(getComplexUnifiedPipeline(this.env).execute(), Row.of(new Object[]{"Bob", 1L}), Row.of(new Object[]{"Alice", 1L}));
    }

    @Test
    public void testComplexUnifiedPipelineStreaming() throws Exception {
        testResult(getComplexUnifiedPipeline(this.env).execute(), Row.of(new Object[]{"Bob", 1L}), Row.of(new Object[]{"Bob", 2L}), Row.of(new Object[]{"Bob", 3L}), Row.of(new Object[]{"Alice", 1L}));
    }

    @Test
    public void testAttachAsDataStream() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        create.createTemporaryTable("InputTable1", TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).option("data-id", TestValuesTableFactory.registerData(Arrays.asList(Row.of(new Object[]{1, "a"}), Row.of(new Object[]{2, "b"})))).schema(Schema.newBuilder().column("i", DataTypes.INT()).column("s", DataTypes.STRING()).build()).build());
        create.createTemporaryTable("OutputTable1", TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).schema(Schema.newBuilder().column("i", DataTypes.INT()).column("s", DataTypes.STRING()).build()).build());
        create.createTemporaryView("InputTable2", this.env.fromElements(new Integer[]{1, 2, 3}));
        create.createTemporaryTable("OutputTable2", TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).schema(Schema.newBuilder().column("i", DataTypes.INT()).build()).build());
        create.createStatementSet().addInsert("OutputTable1", create.from("InputTable1")).addInsert("OutputTable2", create.from("InputTable2")).attachAsDataStream();
        testResult((DataStream) this.env.fromElements(new Integer[]{3, 4, 5}), (Object[]) new Integer[]{3, 4, 5});
        MatcherAssert.assertThat(TestValuesTableFactory.getResults("OutputTable1"), Matchers.containsInAnyOrder(new String[]{"+I[1, a]", "+I[2, b]"}));
        MatcherAssert.assertThat(TestValuesTableFactory.getResults("OutputTable2"), Matchers.containsInAnyOrder(new String[]{"+I[1]", "+I[2]", "+I[3]"}));
    }

    @Test
    public void testMultiChangelogStreamUpsert() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        createTableFromElements(create, "T1", ChangelogMode.insertOnly(), Schema.newBuilder().column("pk", "INT NOT NULL").column("x", "STRING NOT NULL").primaryKey(new String[]{"pk"}).build(), Arrays.asList(Types.INT, Types.STRING), Row.ofKind(RowKind.INSERT, new Object[]{1, "1"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "2"}));
        createTableFromElements(create, "T2", ChangelogMode.upsert(), Schema.newBuilder().column("pk", "INT NOT NULL").column("y", "STRING NOT NULL").column("some_value", "DOUBLE NOT NULL").primaryKey(new String[]{"pk"}).build(), Arrays.asList(Types.INT, Types.STRING, Types.DOUBLE), Row.ofKind(RowKind.INSERT, new Object[]{1, "A", Double.valueOf(1.0d)}), Row.ofKind(RowKind.INSERT, new Object[]{2, "B", Double.valueOf(2.0d)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{1, "A", Double.valueOf(1.1d)}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, "B", Double.valueOf(2.1d)}));
        createTableFromElements(create, "T3", ChangelogMode.insertOnly(), Schema.newBuilder().column("pk1", "STRING NOT NULL").column("pk2", "STRING NOT NULL").column("some_other_value", "DOUBLE NOT NULL").primaryKey(new String[]{"pk1", "pk2"}).build(), Arrays.asList(Types.STRING, Types.STRING, Types.DOUBLE), Row.ofKind(RowKind.INSERT, new Object[]{"1", "A", Double.valueOf(10.0d)}), Row.ofKind(RowKind.INSERT, new Object[]{"1", "B", Double.valueOf(11.0d)}));
        testMaterializedResult(create.toChangelogStream(create.sqlQuery("SELECT\nT1.pk,\nT2.some_value * T3.some_other_value,\nT3.pk1,\nT3.pk2\nFROM T1\nLEFT JOIN T2 on T1.pk = T2.pk\nLEFT JOIN T3 ON T1.x = T3.pk1 AND T2.y = T3.pk2"), Schema.newBuilder().column("pk", "INT NOT NULL").column("some_calculated_value", "DOUBLE").column("pk1", "STRING").column("pk2", "STRING").primaryKey(new String[]{"pk"}).build(), ChangelogMode.upsert()), 0, Row.of(new Object[]{2, null, null, null}), Row.of(new Object[]{1, Double.valueOf(11.0d), "1", "A"}));
    }

    private Table getComplexUnifiedPipeline(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStreamSource fromElements = streamExecutionEnvironment.fromElements(new String[]{"Bob", "Alice"});
        StreamTableEnvironment create = StreamTableEnvironment.create(streamExecutionEnvironment);
        create.createTemporaryView("AllowedNamesTable", create.fromDataStream(fromElements).as("allowedName", new String[0]));
        create.createTemporaryView("UpdatesPerName", create.toChangelogStream(create.sqlQuery("SELECT name, COUNT(*) AS c FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) WHERE name IN (SELECT allowedName FROM AllowedNamesTable)GROUP BY name")).keyBy(row -> {
            return (String) row.getFieldAs("name");
        }).process(new KeyedProcessFunction<String, Row, Tuple2<String, Long>>() { // from class: org.apache.flink.table.planner.runtime.stream.sql.DataStreamJavaITCase.2
            ValueState<Long> count;

            public void open(Configuration configuration) {
                this.count = getRuntimeContext().getState(new ValueStateDescriptor("count", Long.class));
            }

            public void processElement(Row row2, KeyedProcessFunction<String, Row, Tuple2<String, Long>>.Context context, Collector<Tuple2<String, Long>> collector) throws IOException {
                Long l = (Long) this.count.value();
                if (l == null) {
                    l = 0L;
                }
                long longValue = l.longValue() + 1;
                this.count.update(Long.valueOf(longValue));
                collector.collect(Tuple2.of(context.getCurrentKey(), Long.valueOf(longValue)));
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Row) obj, (KeyedProcessFunction<String, Row, Tuple2<String, Long>>.Context) context, (Collector<Tuple2<String, Long>>) collector);
            }
        }));
        return create.sqlQuery("SELECT DISTINCT f0, f1 FROM UpdatesPerName");
    }

    private DataStream<Tuple3<Long, Integer, String>> getWatermarkedDataStream() {
        return this.env.fromCollection(Arrays.asList(Tuple3.of(1L, 42, "a"), Tuple3.of(2L, 5, "a"), Tuple3.of(3L, 1000, "c"), Tuple3.of(100L, 1000, "c")), Types.TUPLE(new TypeInformation[]{Types.LONG, Types.INT, Types.STRING})).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(context -> {
            return (tuple3, j) -> {
                return ((Long) tuple3.f0).longValue();
            };
        }));
    }

    private static Either<Row, Row> input(RowKind rowKind, Object... objArr) {
        return Either.Left(Row.ofKind(rowKind, objArr));
    }

    private static Row[] getInput(List<Either<Row, Row>> list) {
        return (Row[]) list.stream().filter((v0) -> {
            return v0.isLeft();
        }).map((v0) -> {
            return v0.left();
        }).toArray(i -> {
            return new Row[i];
        });
    }

    private static Either<Row, Row> output(RowKind rowKind, Object... objArr) {
        return Either.Right(Row.ofKind(rowKind, objArr));
    }

    private static Row[] getOutput(List<Either<Row, Row>> list) {
        return (Row[]) list.stream().filter((v0) -> {
            return v0.isRight();
        }).map((v0) -> {
            return v0.right();
        }).toArray(i -> {
            return new Row[i];
        });
    }

    private void createTableFromElements(StreamTableEnvironment streamTableEnvironment, String str, ChangelogMode changelogMode, Schema schema, List<TypeInformation<?>> list, Row... rowArr) {
        streamTableEnvironment.createTemporaryView(str, streamTableEnvironment.fromChangelogStream(this.env.fromElements(rowArr).returns(Types.ROW_NAMED((String[]) schema.getColumns().stream().map((v0) -> {
            return v0.getName();
        }).toArray(i -> {
            return new String[i];
        }), (TypeInformation[]) list.toArray(new TypeInformation[0]))), schema, changelogMode));
    }

    private static void testSchema(Table table, Column... columnArr) {
        Assert.assertEquals(ResolvedSchema.of(columnArr), table.getResolvedSchema());
    }

    private static void testSchema(Table table, ResolvedSchema resolvedSchema) {
        Assert.assertEquals(resolvedSchema, table.getResolvedSchema());
    }

    private static void testSchema(TableResult tableResult, Column... columnArr) {
        Assert.assertEquals(ResolvedSchema.of(columnArr), tableResult.getResolvedSchema());
    }

    private static void testResult(TableResult tableResult, Row... rowArr) {
        MatcherAssert.assertThat(CollectionUtil.iteratorToList(tableResult.collect()), Matchers.containsInAnyOrder(rowArr));
    }

    @SafeVarargs
    private static <T> void testResult(DataStream<T> dataStream, T... tArr) throws Exception {
        CloseableIterator executeAndCollect = dataStream.executeAndCollect();
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat(CollectionUtil.iteratorToList(executeAndCollect), Matchers.containsInAnyOrder(tArr));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    private static void testMaterializedResult(DataStream<Row> dataStream, int i, Row... rowArr) throws Exception {
        CloseableIterator executeAndCollect = dataStream.executeAndCollect();
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                executeAndCollect.forEachRemaining(row -> {
                    RowKind kind = row.getKind();
                    row.setKind(RowKind.INSERT);
                    switch (AnonymousClass3.$SwitchMap$org$apache$flink$types$RowKind[kind.ordinal()]) {
                        case 1:
                            Object field = row.getField(i);
                            if (!$assertionsDisabled && field == null) {
                                throw new AssertionError();
                            }
                            arrayList.removeIf(row -> {
                                return field.equals(row.getField(i));
                            });
                            break;
                            break;
                        case 2:
                            break;
                        case BatchAbstractTestBase.DEFAULT_PARALLELISM /* 3 */:
                        case 4:
                            arrayList.remove(row);
                            return;
                        default:
                            return;
                    }
                    arrayList.add(row);
                });
                MatcherAssert.assertThat(arrayList, Matchers.containsInAnyOrder(rowArr));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -889155676:
                if (implMethodName.equals("lambda$testFromAndToChangelogStreamEventTime$24e97dea$1")) {
                    z = 3;
                    break;
                }
                break;
            case -305537222:
                if (implMethodName.equals("lambda$testFromAndToChangelogStreamEventTime$3558be8e$1")) {
                    z = true;
                    break;
                }
                break;
            case 600966284:
                if (implMethodName.equals("lambda$testFromAndToDataStreamEventTime$24e97dea$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1184584738:
                if (implMethodName.equals("lambda$testFromAndToDataStreamEventTime$3558be8e$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1504460242:
                if (implMethodName.equals("lambda$getComplexUnifiedPipeline$7492fce0$1")) {
                    z = 6;
                    break;
                }
                break;
            case 1613485096:
                if (implMethodName.equals("lambda$getWatermarkedDataStream$6c53a2a0$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1623276176:
                if (implMethodName.equals("lambda$testFromAndToChangelogStreamEventTime$d2bf14b8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple3;)Lorg/apache/flink/types/Row;")) {
                    return tuple3 -> {
                        return Row.ofKind(RowKind.INSERT, new Object[]{tuple3.f1, tuple3.f2});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row -> {
                        return row.getField("f1");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/TimestampAssignerSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/TimestampAssignerSupplier$Context;)Lorg/apache/flink/api/common/eventtime/TimestampAssigner;") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/TimestampAssignerSupplier$Context;)Lorg/apache/flink/api/common/eventtime/TimestampAssigner;")) {
                    return context -> {
                        return (tuple32, j) -> {
                            return ((Long) tuple32.f0).longValue();
                        };
                    };
                }
                break;
            case BatchAbstractTestBase.DEFAULT_PARALLELISM /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/api/functions/windowing/WindowFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/streaming/api/windowing/windows/Window;Ljava/lang/Iterable;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/streaming/api/windowing/windows/TimeWindow;Ljava/lang/Iterable;Lorg/apache/flink/util/Collector;)V")) {
                    return (obj, timeWindow, iterable, collector) -> {
                        int i = 0;
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            i += ((Integer) ((Row) it.next()).getFieldAs("f0")).intValue();
                        }
                        collector.collect(Row.of(new Object[]{obj, Integer.valueOf(i)}));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/Object;")) {
                    return row2 -> {
                        return row2.getField("f2");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/api/functions/windowing/WindowFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/streaming/api/windowing/windows/Window;Ljava/lang/Iterable;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/streaming/api/windowing/windows/TimeWindow;Ljava/lang/Iterable;Lorg/apache/flink/util/Collector;)V")) {
                    return (obj2, timeWindow2, iterable2, collector2) -> {
                        int i = 0;
                        Iterator it = iterable2.iterator();
                        while (it.hasNext()) {
                            i += ((Integer) ((Row) it.next()).getFieldAs("f1")).intValue();
                        }
                        collector2.collect(Row.of(new Object[]{obj2, Integer.valueOf(i)}));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;)Ljava/lang/String;")) {
                    return row3 -> {
                        return (String) row3.getFieldAs("name");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !DataStreamJavaITCase.class.desiredAssertionStatus();
    }
}
