package org.apache.hudi.table;

import java.io.File;
import java.net.URL;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/table/ITTestHoodieDataSource.class */
public class ITTestHoodieDataSource {
    private TableEnvironment streamTableEnv;
    private TableEnvironment batchTableEnv;

    @TempDir
    File tempFile;

    /* loaded from: input_file:org/apache/hudi/table/ITTestHoodieDataSource$ExecMode.class */
    private enum ExecMode {
        BATCH,
        STREAM
    }

    @BeforeEach
    void beforeEach() {
        this.streamTableEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().build());
        this.streamTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        Configuration configuration = this.streamTableEnv.getConfig().getConfiguration();
        configuration.setString("execution.checkpointing.interval", "2s");
        configuration.setString("restart-strategy", "fixed-delay");
        configuration.setString("restart-strategy.fixed-delay.attempts", "0");
        this.batchTableEnv = TestTableEnvs.getBatchTableEnv();
        this.batchTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType hoodieTableType) throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source"));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        String firstCompleteInstant = TestUtils.getFirstCompleteInstant(this.tempFile.getAbsolutePath());
        this.streamTableEnv.executeSql("drop table t1");
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_START_COMMIT, firstCompleteInstant).end());
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), TestData.DATA_SET_SOURCE_INSERT);
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), TestData.DATA_SET_SOURCE_INSERT);
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1/*+options('read.start-commit'='earliest')*/", 10L), TestData.DATA_SET_SOURCE_INSERT);
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    void testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) throws Exception {
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source"));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.CDC_ENABLED, (Object) true).option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, hoodieCDCSupplementalLoggingMode.name()).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1/*+options('read.start-commit'='" + TestUtils.getFirstCompleteInstant(this.tempFile.getAbsolutePath()) + "')*/", 10L), TestData.DATA_SET_SOURCE_INSERT);
        execInsertSql(this.streamTableEnv, TestSQL.UPDATE_INSERT_T1);
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), TestData.DATA_SET_SOURCE_CHANGELOG);
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1/*+options('read.start-commit'='earliest')*/", 10L), TestData.DATA_SET_SOURCE_MERGED);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testStreamWriteAndRead(HoodieTableType hoodieTableType) throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source"));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testStreamReadAppendData(HoodieTableType hoodieTableType) throws Exception {
        String fileSourceDDL = TestConfigurations.getFileSourceDDL("source");
        String fileSourceDDL2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data");
        this.streamTableEnv.executeSql(fileSourceDDL);
        this.streamTableEnv.executeSql(fileSourceDDL2);
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        String firstCompleteInstant = TestUtils.getFirstCompleteInstant(this.tempFile.getAbsolutePath());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source2");
        this.streamTableEnv.executeSql(TestConfigurations.sql("t2").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_START_COMMIT, firstCompleteInstant).end());
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t2", 10L), TestData.DATA_SET_SOURCE_MERGED);
    }

    @Test
    void testStreamWriteBatchRead() {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source"));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return this.streamTableEnv.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
    }

    @Test
    void testStreamWriteBatchReadOptimized() throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source"));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.QUERY_TYPE, "read_optimized").option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object) 1).option(FlinkOptions.COMPACTION_TASKS, (Object) 1).option(FlinkOptions.METADATA_ENABLED, (Object) false).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TimeUnit.SECONDS.sleep(5L);
        List iterableToList = CollectionUtil.iterableToList(() -> {
            return this.streamTableEnv.sqlQuery("select * from t1").execute().collect();
        });
        if (iterableToList.size() < TestData.DATA_SET_SOURCE_INSERT.size()) {
            TestData.assertRowsEquals((List<Row>) iterableToList, TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT);
        } else {
            TestData.assertRowsEquals((List<Row>) iterableToList, TestData.DATA_SET_SOURCE_INSERT);
        }
    }

    @Test
    void testStreamWriteBatchReadOptimizedWithoutCompaction() {
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.QUERY_TYPE, "read_optimized").end());
        execInsertSql(this.streamTableEnv, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        Assertions.assertTrue(CollectionUtil.iterableToList(() -> {
            return this.streamTableEnv.sqlQuery("select * from t1").execute().collect();
        }).isEmpty());
    }

    @Test
    void testStreamWriteReadSkippingCompaction() throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, (Object) true).option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object) 1).option(FlinkOptions.COMPACTION_TASKS, (Object) 1).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        String nthCompleteInstant = TestUtils.getNthCompleteInstant(this.tempFile.getAbsolutePath(), 2, "deltacommit");
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, String.format("select * from t1/*+ options('read.start-commit'='%s')*/", nthCompleteInstant), 10L), TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
    }

    @Test
    void testAppendWriteReadSkippingClustering() throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, "insert").option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, (Object) true).option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, (Object) true).option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, (Object) true).option(FlinkOptions.CLUSTERING_DELTA_COMMITS, (Object) 1).option(FlinkOptions.CLUSTERING_TASKS, (Object) 1).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        String nthCompleteInstant = TestUtils.getNthCompleteInstant(this.tempFile.getAbsolutePath(), 2, "commit");
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, String.format("select * from t1/*+ options('read.start-commit'='%s')*/", nthCompleteInstant), 10L), TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
    }

    @Test
    void testAppendWriteWithClusteringBatchRead() throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, "insert").option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, (Object) true).option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, (Object) true).option(FlinkOptions.CLUSTERING_DELTA_COMMITS, (Object) 2).option(FlinkOptions.CLUSTERING_TASKS, (Object) 1).option(FlinkOptions.CLEAN_RETAIN_COMMITS, (Object) 1).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        this.streamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, String.format("select * from t1/*+ options('read.start-commit'='%s')*/", "earliest"), 10L), (List<RowData>) CollectionUtils.combine(TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT));
    }

    @Test
    void testStreamWriteWithCleaning() {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", "test_source_3.data", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.CLEAN_RETAIN_COMMITS, (Object) 1).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        HashMap hashMap = new HashMap(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()).toMap());
        hashMap.put(FlinkOptions.TABLE_NAME.key(), "t1");
        Assertions.assertTrue(StreamerUtil.createMetaClient(Configuration.fromMap(hashMap)).getActiveTimeline().filterCompletedInstants().getInstantsAsStream().anyMatch(hoodieInstant -> {
            return hoodieInstant.getAction().equals("clean");
        }), "some commits should be cleaned");
    }

    @Test
    void testStreamReadWithDeletes() throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        defaultConf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
        TestData.writeData(TestData.DATA_SET_INSERT, defaultConf);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, defaultConf);
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object) 2).option(FlinkOptions.READ_START_COMMIT, TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath())).option(FlinkOptions.CHANGELOG_ENABLED, (Object) true).end());
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select name, sum(age) from t1 group by name", "create table sink(\n  name varchar(20),\n  age_sum int\n) with (\n  'connector' = 'collect')", 10L), "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]", true);
    }

    @MethodSource({"tableTypeAndPartitioningParams"})
    @ParameterizedTest
    void testStreamReadFilterByPartition(HoodieTableType hoodieTableType, boolean z) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        defaultConf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, z);
        TestData.writeData(TestData.DATA_SET_INSERT, defaultConf);
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object) 2).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).end());
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1 where `partition`='par1'", 10L), "[+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), +I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]", true);
    }

    @Test
    void testStreamReadMorTableWithCompactionPlan() throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source"));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.READ_START_COMMIT, "earliest").option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object) 2).option(FlinkOptions.COMPACTION_ASYNC_ENABLED, (Object) false).option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object) 1).noPartition().end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], +I[id3, Julian, 53, 1970-01-01T00:00:03, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], +I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @MethodSource({"executionModeAndPartitioningParams"})
    @ParameterizedTest
    void testWriteAndRead(ExecMode execMode, boolean z) {
        TableEnvironment tableEnvironment = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect();
        }), "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @MethodSource({"tableTypeAndPartitioningParams"})
    @ParameterizedTest
    void testWriteAndReadWithProctimeSequence(HoodieTableType hoodieTableType, boolean z) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("tss timestamp(3)").field("`partition` varchar(10)").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_SAME_KEY_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
    }

    @MethodSource({"tableTypeAndPartitioningParams"})
    @ParameterizedTest
    void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType hoodieTableType, boolean z) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("ts timestamp(3)").field("`partition` varchar(10)").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).option(FlinkOptions.PRECOMBINE_FIELD, "no_precombine").end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_SAME_KEY_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testBatchModeUpsertWithoutPartition(HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_NAME, hoodieTableType.name()).option("hoodie.parquet.small.file.limit", "0").option("hoodie.parquet.max.file.size", "0").noPartition().end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        execInsertSql(tableEnvironment, TestSQL.UPDATE_INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_MERGED);
    }

    @MethodSource({"tableTypeAndPartitioningParams"})
    @ParameterizedTest
    void testBatchModeUpsert(HoodieTableType hoodieTableType, boolean z) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_NAME, hoodieTableType).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        execInsertSql(tableEnvironment, TestSQL.UPDATE_INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_MERGED);
    }

    @EnumSource(ExecMode.class)
    @ParameterizedTest
    void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
        this.streamTableEnv.executeSql("create table t1(\n  uuid varchar(20),\n  name varchar(10),\n  age int,\n  `partition` varchar(20),\n  ts timestamp(3),\n  PRIMARY KEY(uuid) NOT ENFORCED\n)\nPARTITIONED BY (`partition`)\nwith (\n  'connector' = 'hudi',\n  'path' = '" + this.tempFile.getAbsolutePath() + "',\n  'read.streaming.enabled' = '" + (execMode == ExecMode.STREAM) + "'\n)");
        execInsertSql(this.streamTableEnv, "insert into t1 values\n('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", execMode), "[+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], +I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], +I[id3, Julian, 53, par2, 1970-01-01T00:00:03], +I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], +I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], +I[id6, Emma, 20, par3, 1970-01-01T00:00:06], +I[id7, Bob, 44, par4, 1970-01-01T00:00:07], +I[id8, Han, 56, par4, 1970-01-01T00:00:08]]");
        execInsertSql(this.streamTableEnv, "insert into t1 values\n('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", execMode), "[+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], +I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], +I[id3, Julian, 53, par2, 1970-01-01T00:00:03], +I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], +I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], +I[id6, Emma, 20, par3, 1970-01-01T00:00:06], +I[id7, Bob, 44, par4, 1970-01-01T00:00:07], +I[id8, Han, 56, par4, 1970-01-01T00:00:08]]");
    }

    @EnumSource(ExecMode.class)
    @ParameterizedTest
    void testWriteAndReadWithTimestampMicros(ExecMode execMode) throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").field("id int").field("name varchar(10)").field("ts timestamp(6)").pkField("id").noPartition().option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, Boolean.valueOf(execMode == ExecMode.STREAM)).end());
        execInsertSql(this.streamTableEnv, "insert into t1 values\n(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001'),\n(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002'),\n(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003'),\n(4,'Fabian',TIMESTAMP '2021-12-04 15:16:04.400004'),\n(5,'Tom',TIMESTAMP '2721-12-04 15:16:04.500005')");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", execMode), "[+I[1, Danny, 2021-12-01T01:02:01.100001], +I[2, Stephen, 2021-12-02T03:04:02.200002], +I[3, Julian, 2021-12-03T13:14:03.300003], +I[4, Fabian, 2021-12-04T15:16:04.400004], +I[5, Tom, 2721-12-04T15:16:04.500005]]");
        execInsertSql(this.streamTableEnv, "insert into t1 values\n(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001'),\n(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002'),\n(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003'),\n(4,'Fabian',TIMESTAMP '2021-12-04 15:16:04.400004'),\n(5,'Tom',TIMESTAMP '2721-12-04 15:16:04.500005')");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", execMode), "[+I[1, Danny, 2021-12-01T01:02:01.100001], +I[2, Stephen, 2021-12-02T03:04:02.200002], +I[3, Julian, 2021-12-03T13:14:03.300003], +I[4, Fabian, 2021-12-04T15:16:04.400004], +I[5, Tom, 2721-12-04T15:16:04.500005]]");
    }

    @MethodSource({"indexAndTableTypeParams"})
    @ParameterizedTest
    void testInsertOverwrite(String str, HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.INDEX_TYPE, str).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        execInsertSql(tableEnvironment, "insert overwrite t1 partition(`partition`='par1') values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
        execInsertSql(tableEnvironment, "insert overwrite t1 partition(`partition`='par1') values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
        execInsertSql(tableEnvironment, "insert overwrite t1 /*+ OPTIONS('write.partition.overwrite.mode'='dynamic') */ values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n");
        List iterableToList = CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        });
        TestData.assertRowsEquals((List<Row>) iterableToList, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
        execInsertSql(tableEnvironment, "insert overwrite t1 /*+ OPTIONS('write.partition.overwrite.mode'='dynamic') */ values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n");
        TestData.assertRowsEquals((List<Row>) iterableToList, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
        execInsertSql(tableEnvironment, "insert overwrite t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]");
        execInsertSql(tableEnvironment, "insert overwrite t1 values\n('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testStreamWriteAndReadWithMiniBatches(HoodieTableType hoodieTableType) throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_START_COMMIT, "earliest").option(FlinkOptions.WRITE_BATCH_SIZE, Double.valueOf(1.0E-5d)).noPartition().end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 20L), TestData.DATA_SET_SOURCE_INSERT);
    }

    @MethodSource({"executionModeAndTableTypeParams"})
    @ParameterizedTest
    void testBatchUpsertWithMiniBatches(ExecMode execMode, HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.WRITE_BATCH_SIZE, "0.001").option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par1'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par1')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
    }

    @MethodSource({"executionModeAndTableTypeParams"})
    @ParameterizedTest
    void testBatchUpsertWithMiniBatchesGlobalIndex(ExecMode execMode, HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.WRITE_BATCH_SIZE, "0.001").option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.INDEX_GLOBAL_ENABLED, (Object) true).end());
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par3]]");
    }

    @Test
    void testUpdateWithDefaultHoodieRecordPayload() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("id int").field("name string").field("price double").field("ts bigint").pkField("id").noPartition().option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()).end());
        execInsertSql(tableEnvironment, "insert into t1 values\n(1,'a1',20,20)");
        execInsertSql(tableEnvironment, "insert into t1 values\n(1,'a1',20,1)");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, a1, 20.0, 20]]");
    }

    @MethodSource({"executionModeAndTableTypeParams"})
    @ParameterizedTest
    void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).noPartition().end());
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]");
    }

    @Test
    void testWriteGlobalIndex() {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", "test_source_4.data", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.INDEX_GLOBAL_ENABLED, (Object) true).option(FlinkOptions.PRE_COMBINE, (Object) true).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return this.streamTableEnv.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testWriteLocalIndex() {
        this.streamTableEnv.executeSql(TestConfigurations.getFileSourceDDL("source", "test_source_4.data", 4));
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.INDEX_GLOBAL_ENABLED, (Object) false).option(FlinkOptions.PRE_COMBINE, (Object) true).end());
        execInsertSql(this.streamTableEnv, "insert into t1 select * from source");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return this.streamTableEnv.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Stephen, 34, 1970-01-01T00:00:02, par1], +I[id1, Fabian, 32, 1970-01-01T00:00:04, par2], +I[id1, Jane, 19, 1970-01-01T00:00:06, par3], +I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]", 3);
    }

    @Test
    void testStreamReadEmptyTablePath() throws Exception {
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, "true").option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).end());
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), "[]");
        StreamerUtil.initTableIfNotExists(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()));
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from t1", 10L), "[]");
    }

    @Test
    void testBatchReadEmptyTablePath() throws Exception {
        this.batchTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).end());
        Assertions.assertThrows(Exception.class, () -> {
            execSelectSql(this.batchTableEnv, "select * from t1", 10L);
        }, "Exception should throw when querying non-exists table in batch mode");
        StreamerUtil.initTableIfNotExists(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()));
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iteratorToList(this.batchTableEnv.executeSql("select * from t1").collect()), "[]");
    }

    @EnumSource(ExecMode.class)
    @ParameterizedTest
    void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
        this.streamTableEnv.executeSql("CREATE TABLE debezium_source(\n  id INT NOT NULL PRIMARY KEY NOT ENFORCED,\n  ts BIGINT,\n  name STRING,\n  description STRING,\n  weight DOUBLE\n) WITH (\n  'connector' = 'filesystem',\n  'path' = '" + ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("debezium_json.data"))).toString() + "',\n  'format' = 'debezium-json'\n)");
        this.streamTableEnv.executeSql(TestConfigurations.sql("hoodie_sink").field("id INT NOT NULL").field("ts BIGINT").field("name STRING").field("weight DOUBLE").pkField("id").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_AS_STREAMING, Boolean.valueOf(execMode == ExecMode.STREAM)).option(FlinkOptions.PRE_COMBINE, (Object) true).noPartition().end());
        execInsertSql(this.streamTableEnv, "insert into hoodie_sink select id, ts, name, weight from debezium_source");
        TestData.assertRowsEquals(execSelectSql(this.streamTableEnv, "select * from hoodie_sink", execMode), "[+I[101, 1000, scooter, 3.140000104904175], +I[102, 2000, car battery, 8.100000381469727], +I[103, 3000, 12-pack drill bits, 0.800000011920929], +I[104, 4000, hammer, 0.75], +I[105, 5000, hammer, 0.875], +I[106, 10000, hammer, 1.0], +I[107, 11000, rocks, 5.099999904632568], +I[108, 8000, jacket, 0.10000000149011612], +I[109, 9000, spare tire, 22.200000762939453], +I[110, 14000, jacket, 0.5]]");
    }

    @MethodSource({"indexAndPartitioningParams"})
    @ParameterizedTest
    void testBulkInsert(String str, boolean z) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"));
        tableEnvironment.executeSql(TestConfigurations.sql("hoodie_sink").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, "bulk_insert").option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, (Object) true).option(FlinkOptions.INDEX_TYPE, str).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).end());
        execInsertSql(tableEnvironment, "insert into hoodie_sink select * from csv_source");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from hoodie_sink").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect();
        }), "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testBulkInsertWithSortByRecordKey() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, "bulk_insert").option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, (Object) true).option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, (Object) true).option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY, (Object) true).end());
        execInsertSql(tableEnvironment, "insert into t1 values\n('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Julian, 53, 1970-01-01T00:00:03, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]", 4);
    }

    @Test
    void testBulkInsertNonPartitionedTable() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, "bulk_insert").noPartition().end());
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')");
        execInsertSql(tableEnvironment, "insert into t1 values\n('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], +I[id1, Stephen, 33, 1970-01-01T00:00:02, par2], +I[id1, Julian, 53, 1970-01-01T00:00:03, par3], +I[id1, Fabian, 31, 1970-01-01T00:00:04, par4], +I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testAppendWrite(boolean z) {
        TableEnvironment tableEnvironment = this.streamTableEnv;
        tableEnvironment.executeSql(TestConfigurations.getFileSourceDDL("source"));
        tableEnvironment.executeSql(TestConfigurations.sql("hoodie_sink").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, "insert").option(FlinkOptions.INSERT_CLUSTER, Boolean.valueOf(z)).end());
        execInsertSql(tableEnvironment, "insert into hoodie_sink select * from source");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from hoodie_sink").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect();
        }), "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @MethodSource({"executionModeAndPartitioningParams"})
    @ParameterizedTest
    void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean z) {
        TableEnvironment tableEnvironment = execMode == ExecMode.BATCH ? this.batchTableEnv : this.streamTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).partitionField("ts").end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect();
        }), "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testMergeOnReadCompactionWithTimestampPartitioning() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ).option(FlinkOptions.COMPACTION_DELTA_COMMITS, (Object) 1).option(FlinkOptions.COMPACTION_TASKS, (Object) 1).partitionField("ts").end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
    }

    @ValueSource(strings = {"yyyyMMdd", "yyyy-MM-dd"})
    @ParameterizedTest
    void testWriteAndReadWithDatePartitioning(String str) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("ts date").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.PARTITION_FORMAT, str).partitionField("ts").end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_DATE_PARTITION_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01], +I[id2, Stephen, 33, 1970-01-01], +I[id3, Julian, 53, 1970-01-01], +I[id4, Fabian, 31, 1970-01-01], +I[id5, Sophia, 18, 1970-01-01], +I[id6, Emma, 20, 1970-01-01], +I[id7, Bob, 44, 1970-01-01], +I[id8, Han, 56, 1970-01-01]]");
    }

    @ValueSource(strings = {"bulk_insert", "upsert"})
    @ParameterizedTest
    void testWriteReadDecimals(String str) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("decimals").field("f0 decimal(3, 2)").field("f1 decimal(10, 2)").field("f2 decimal(20, 2)").field("f3 decimal(38, 18)").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, str).option(FlinkOptions.PRECOMBINE_FIELD, "f1").pkField("f0").noPartition().end());
        execInsertSql(tableEnvironment, "insert into decimals values\n(1.23, 12345678.12, 12345.12, 123456789.12345)");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from decimals").execute().collect();
        }), "[+I[1.23, 12345678.12, 12345.12, 123456789.123450000000000000]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testIncrementalRead(HoodieTableType hoodieTableType) throws Exception {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        TestData.writeData(TestData.dataSetInsert(1, 2), defaultConf);
        TestData.writeData(TestData.dataSetInsert(3, 4), defaultConf);
        TestData.writeData(TestData.dataSetInsert(5, 6), defaultConf);
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_START_COMMIT, TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath())).end());
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.dataSetInsert(5, 6));
    }

    @Test
    void testReadChangelogIncremental() throws Exception {
        TableEnvironment tableEnvironment = this.streamTableEnv;
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
        defaultConf.setBoolean(FlinkOptions.CDC_ENABLED, true);
        TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), defaultConf);
        TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), defaultConf);
        TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), defaultConf);
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.READ_START_COMMIT, TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath())).option(FlinkOptions.CDC_ENABLED, (Object) true).end());
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.dataSetUpsert(2, 1));
        for (int i = 0; i < 10; i++) {
            TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), defaultConf);
        }
        String format = String.format("select count(*) from t1/*+ options('read.start-commit'='%s')*/", TestUtils.getFirstCompleteInstant(this.tempFile.getAbsolutePath()));
        List iterableToList = CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery(format).execute().collect();
        });
        TestData.assertRowsEquals((List<Row>) iterableToList.subList(iterableToList.size() - 2, iterableToList.size()), "[-U[1], +U[2]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testIncrementalReadArchivedCommits(HoodieTableType hoodieTableType) throws Exception {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        defaultConf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 4);
        defaultConf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 5);
        defaultConf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 3);
        defaultConf.setString("hoodie.commits.archival.batch", "1");
        for (int i = 0; i < 20; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), defaultConf);
        }
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_START_COMMIT, TestUtils.getNthArchivedInstant(this.tempFile.getAbsolutePath(), 1)).end());
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadWithWiderSchema(HoodieTableType hoodieTableType) throws Exception {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        TestData.writeData(TestData.DATA_SET_INSERT, defaultConf);
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("uuid varchar(20)").field("name varchar(10)").field("age int").field("salary double").field("ts timestamp(3)").field("`partition` varchar(10)").pkField("uuid").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[id1, Danny, 23, null, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, null, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 53, null, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, null, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, null, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, null, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, null, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, null, 1970-01-01T00:00:00.008, par4]]");
    }

    @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
    @ParameterizedTest
    void testParquetComplexTypes(String str) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f_int int").field("f_array array<varchar(10)>").field("f_map map<varchar(20), int>").field("f_row row(f_row_f0 int, f_row_f1 varchar(10))").pkField("f_int").noPartition().option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, str).end());
        execInsertSql(tableEnvironment, TestSQL.COMPLEX_TYPE_INSERT_T1);
        TestData.assertRowsEqualsUnordered(CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), Arrays.asList(TestData.row(1, TestData.array("abc1", "def1"), TestData.map("abc1", 1, "def1", 3), TestData.row(1, "abc1")), TestData.row(2, TestData.array("abc2", "def2"), TestData.map("abc2", 1, "def2", 3), TestData.row(2, "abc2")), TestData.row(3, TestData.array("abc3", "def3"), TestData.map("abc3", 1, "def3", 3), TestData.row(3, "abc3"))));
    }

    @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
    @ParameterizedTest
    void testParquetComplexNestedRowTypes(String str) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f_int int").field("f_array array<varchar(10)>").field("int_array array<int>").field("f_map map<varchar(20), int>").field("f_row row(f_nested_array array<varchar(10)>, f_nested_row row(f_row_f0 int, f_row_f1 varchar(10)))").pkField("f_int").noPartition().option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, str).end());
        execInsertSql(tableEnvironment, TestSQL.COMPLEX_NESTED_ROW_TYPE_INSERT_T1);
        TestData.assertRowsEqualsUnordered(CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), Arrays.asList(TestData.row(1, TestData.array("abc1", "def1"), TestData.array(1, 1), TestData.map("abc1", 1, "def1", 3), TestData.row(TestData.array("abc1", "def1"), TestData.row(1, "abc1"))), TestData.row(2, TestData.array("abc2", "def2"), TestData.array(2, 2), TestData.map("abc2", 1, "def2", 3), TestData.row(TestData.array("abc2", "def2"), TestData.row(2, "abc2"))), TestData.row(3, TestData.array("abc3", "def3"), TestData.array(3, 3), TestData.map("abc3", 1, "def3", 3), TestData.row(TestData.array("abc3", "def3"), TestData.row(3, "abc3")))));
    }

    @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
    @ParameterizedTest
    void testParquetNullChildColumnsRowTypes(String str) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f_int int").field("f_row row(f_row_f0 int, f_row_f1 varchar(10))").pkField("f_int").noPartition().option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.OPERATION, str).end());
        execInsertSql(tableEnvironment, TestSQL.NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, +I[null, abc1]], +I[2, +I[2, null]], +I[3, null]]");
    }

    @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
    @ParameterizedTest
    void testBuiltinFunctionWithCatalog(String str) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.catalog("hudi_" + str).catalogPath(this.tempFile.getAbsolutePath()).end());
        tableEnvironment.executeSql("use catalog hudi_" + str);
        tableEnvironment.executeSql("create database hudi");
        tableEnvironment.executeSql("use hudi");
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f_int int").field("f_date DATE").pkField("f_int").partitionField("f_int").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath() + "/hudi/" + str).option(FlinkOptions.OPERATION, str).end());
        execInsertSql(tableEnvironment, "insert into t1 values (1, TO_DATE('2022-02-02')), (2, DATE '2022-02-02')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, 2022-02-02], +I[2, 2022-02-02]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where f_int = 1").execute().collect();
        }), "[+I[1, 2022-02-02]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testWriteAndReadWithDataSkipping(HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.METADATA_ENABLED, (Object) true).option("hoodie.metadata.index.column.stats.enable", (Object) true).option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, (Object) true).option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid > 'id5' and age > 20").execute().collect();
        }), "[+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:05'").execute().collect();
        }), "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid in ('id6', 'id7', 'id8')").execute().collect();
        }), "[+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], +I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    @Test
    void testMultipleLogBlocksWithDataSkipping() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.METADATA_ENABLED, (Object) true).option("hoodie.metadata.index.column.stats.enable", (Object) true).option("hoodie.metadata.index.column.stats.file.group.count", (Object) 2).option("hoodie.metadata.index.column.stats.column.list", "ts").option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, (Object) true).option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ).option("hoodie.logfile.data.block.max.size", (Object) 1).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_SAME_KEY_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
    }

    @Test
    void testEagerFlushWithDataSkipping() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.METADATA_ENABLED, (Object) true).option("hoodie.metadata.index.column.stats.enable", (Object) true).option("hoodie.metadata.index.column.stats.file.group.count", (Object) 2).option("hoodie.metadata.index.column.stats.column.list", "ts").option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, (Object) true).option(FlinkOptions.WRITE_BATCH_SIZE, Double.valueOf(1.0E-5d)).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_SAME_KEY_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testBucketPruning(HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.INDEX_TYPE, "BUCKET").option(FlinkOptions.TABLE_TYPE, hoodieTableType).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.DATA_SET_SOURCE_INSERT);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid = 'id5' and age < 20").execute().collect();
        }), "[+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid = 'id7' and ts > TIMESTAMP '1970-01-01 00:00:05'").execute().collect();
        }), "[+I[id7, Bob, 44, 1970-01-01T00:00:07, par4]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where name in ('Danny', 'Julian') and uuid='id1'").execute().collect();
        }), "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]");
    }

    @Test
    void testBuiltinFunctionWithHMSCatalog() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.registerCatalog("hudi_catalog", HoodieCatalogTestUtils.createHiveCatalog("hudi_catalog"));
        tableEnvironment.executeSql("use catalog hudi_catalog");
        tableEnvironment.executeSql("create database hudi");
        tableEnvironment.executeSql("use hudi");
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f_int int").field("f_date DATE").field("f_par string").pkField("f_int").partitionField("f_par").option(FlinkOptions.RECORD_KEY_FIELD, "f_int").option(FlinkOptions.PRECOMBINE_FIELD, "f_date").end());
        execInsertSql(tableEnvironment, "insert into t1 values (1, TO_DATE('2022-02-02'), '1'), (2, DATE '2022-02-02', '2')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, 2022-02-02, 1], +I[2, 2022-02-02, 2]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where f_par = '1'").execute().collect();
        }), "[+I[1, 2022-02-02, 1]]");
    }

    @Test
    void testWriteReadWithComputedColumns() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f0 int").field("f1 varchar(10)").field("f2 bigint").field("f3 as f0 + f2").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.PRECOMBINE_FIELD, "f1").pkField("f0").noPartition().end());
        execInsertSql(tableEnvironment, "insert into t1 values\n(1, 'abc', 2)");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, abc, 2, 3]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select f3 from t1").execute().collect();
        }), "[+I[3]]");
    }

    @Test
    void testWriteReadWithComputedColumnsInTheMiddle() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f0 int").field("f1 int").field("f2 as f0 + f1").field("f3 varchar(10)").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.PRECOMBINE_FIELD, "f1").pkField("f0").noPartition().end());
        execInsertSql(tableEnvironment, "insert into t1(f0, f1, f3) values\n(1, 2, 'abc')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, 2, 3, abc]]");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select f2 from t1").execute().collect();
        }), "[+I[3]]");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testWriteReadWithLocalTimestamp(HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        tableEnvironment.executeSql(TestConfigurations.sql("t1").field("f0 int").field("f1 varchar(10)").field("f2 TIMESTAMP_LTZ(3)").field("f4 TIMESTAMP_LTZ(6)").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.PRECOMBINE_FIELD, "f1").option(FlinkOptions.TABLE_TYPE, hoodieTableType).pkField("f0").noPartition().end());
        execInsertSql(tableEnvironment, "insert into t1 values\n(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), "[+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], +I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]");
    }

    @MethodSource({"tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams"})
    @ParameterizedTest
    void testReadMetaFields(HoodieTableType hoodieTableType, String str, int i, int i2) throws Exception {
        String absolutePath = this.tempFile.getAbsolutePath();
        this.batchTableEnv.executeSql(TestConfigurations.sql("t1").field("id int").field("name varchar(10)").field("ts timestamp(6)").field("`partition` varchar(10)").pkField("id").partitionField("partition").option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.QUERY_TYPE, str).option(FlinkOptions.COMPACTION_ASYNC_ENABLED, (Object) true).option(FlinkOptions.COMPACTION_DELTA_COMMITS, Integer.valueOf(i2)).option(FlinkOptions.PATH, absolutePath).end());
        String[] strArr = {"insert into t1 values(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001', 'par1')", "insert into t1 values(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002', 'par2')", "insert into t1 values(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003', 'par3')"};
        String[] strArr2 = {"+I[1, Danny, 2021-12-01T01:02:01.100001, par1]", ", +I[2, Stephen, 2021-12-02T03:04:02.200002, par2]", ", +I[3, Julian, 2021-12-03T13:14:03.300003, par3]"};
        String[] strArr3 = {"+I[%s, 1, par1, 1, Danny, 2021-12-01T01:02:01.100001, par1]", ", +I[%s, 2, par2, 2, Stephen, 2021-12-02T03:04:02.200002, par2]", ", +I[%s, 3, par3, 3, Julian, 2021-12-03T13:14:03.300003, par3]"};
        String[] strArr4 = {"+I[1, %s, Danny, 1, 2021-12-01T01:02:01.100001, par1, par1]", ", +I[2, %s, Stephen, 2, 2021-12-02T03:04:02.200002, par2, par2]", ", +I[3, %s, Julian, 3, 2021-12-03T13:14:03.300003, par3, par3]"};
        StringBuilder sb = new StringBuilder();
        StringBuilder sb2 = new StringBuilder();
        StringBuilder sb3 = new StringBuilder();
        sb.append("[");
        sb2.append("[");
        sb3.append("[");
        for (int i3 = 0; i3 < i; i3++) {
            execInsertSql(this.batchTableEnv, strArr[i3]);
            String lastDeltaCompleteInstant = hoodieTableType.equals(HoodieTableType.MERGE_ON_READ) ? TestUtils.getLastDeltaCompleteInstant(absolutePath) : TestUtils.getLastCompleteInstant(absolutePath);
            sb.append(strArr2[i3]);
            sb2.append(String.format(strArr3[i3], lastDeltaCompleteInstant));
            sb3.append(String.format(strArr4[i3], lastDeltaCompleteInstant));
        }
        sb.append("]");
        sb2.append("]");
        sb3.append("]");
        this.batchTableEnv.executeSql("drop table t1");
        this.batchTableEnv.executeSql(TestConfigurations.sql("t1").field("id int").field("name varchar(10)").field("ts timestamp(6)").field("`partition` varchar(10)").pkField("id").partitionField("partition").option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.QUERY_TYPE, str).option(FlinkOptions.COMPACTION_ASYNC_ENABLED, (Object) true).option(FlinkOptions.COMPACTION_DELTA_COMMITS, Integer.valueOf(i2)).option(FlinkOptions.PATH, absolutePath).end());
        TestData.assertRowsEquals(execSelectSql(this.batchTableEnv, "select * from t1", ExecMode.BATCH), sb.toString());
        this.batchTableEnv.executeSql("drop table t1");
        this.batchTableEnv.executeSql(TestConfigurations.sql("t1").field("_hoodie_commit_time string").field("_hoodie_record_key string").field("_hoodie_partition_path string").field("id int").field("name varchar(10)").field("ts timestamp(6)").field("`partition` varchar(10)").pkField("id").partitionField("partition").option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.QUERY_TYPE, str).option(FlinkOptions.COMPACTION_ASYNC_ENABLED, (Object) true).option(FlinkOptions.COMPACTION_DELTA_COMMITS, Integer.valueOf(i2)).option(FlinkOptions.PATH, absolutePath).end());
        TestData.assertRowsEquals(execSelectSql(this.batchTableEnv, "select * from t1", ExecMode.BATCH), sb2.toString());
        this.batchTableEnv.executeSql("drop table t1");
        this.batchTableEnv.executeSql(TestConfigurations.sql("t1").field("id int").field("_hoodie_commit_time string").field("name varchar(10)").field("_hoodie_record_key string").field("ts timestamp(6)").field("_hoodie_partition_path string").field("`partition` varchar(10)").pkField("id").partitionField("partition").option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.QUERY_TYPE, str).option(FlinkOptions.COMPACTION_ASYNC_ENABLED, (Object) true).option(FlinkOptions.COMPACTION_DELTA_COMMITS, Integer.valueOf(i2)).option(FlinkOptions.PATH, absolutePath).end());
        TestData.assertRowsEquals(execSelectSql(this.batchTableEnv, "select * from t1", ExecMode.BATCH), sb3.toString());
    }

    @MethodSource({"tableTypeAndPartitioningParams"})
    @ParameterizedTest
    void testDynamicPartitionPrune(HoodieTableType hoodieTableType, boolean z) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        defaultConf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, z);
        TestData.writeData(TestData.DATA_SET_INSERT, defaultConf);
        this.streamTableEnv.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.READ_AS_STREAMING, (Object) true).option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, (Object) 2).option(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.valueOf(z)).end());
        TableResult submitSelectSql = submitSelectSql(this.streamTableEnv, "select uuid, name, age, ts, `partition` as part from t1 where `partition` > 'par4'", TestConfigurations.getCollectSinkDDL("sink"));
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, defaultConf);
        TestData.assertRowsEquals(fetchResult(this.streamTableEnv, submitSelectSql, 10L), TestData.DATA_SET_INSERT_SEPARATE_PARTITION);
    }

    @MethodSource({"indexAndTableTypeParams"})
    @ParameterizedTest
    void testUpdateDelete(String str, HoodieTableType hoodieTableType) {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).option(FlinkOptions.TABLE_TYPE, hoodieTableType).option(FlinkOptions.INDEX_TYPE, str).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        execInsertSql(tableEnvironment, "update t1 set age=18 where uuid in('id1', 'id2')");
        List iterableToList = CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        });
        List<RowData> update = TestData.update(TestData.DATA_SET_SOURCE_INSERT, 2, 18, 0, 1);
        TestData.assertRowsEquals((List<Row>) iterableToList, update);
        execInsertSql(tableEnvironment, "update t1 set age=19 where uuid > 'id5'");
        List iterableToList2 = CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        });
        List<RowData> update2 = TestData.update(update, 2, 19, 5, 6, 7);
        TestData.assertRowsEquals((List<Row>) iterableToList2, update2);
        execInsertSql(tableEnvironment, "delete from t1 where uuid = 'id1'");
        List iterableToList3 = CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        });
        List<RowData> delete = TestData.delete(update2, 0);
        TestData.assertRowsEquals((List<Row>) iterableToList3, delete);
        execInsertSql(tableEnvironment, "delete from t1 where uuid <= 'id5'");
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1").execute().collect();
        }), TestData.delete(delete, 0, 1, 2, 3));
    }

    @Test
    void testReadWithParquetPredicatePushDown() {
        TableEnvironment tableEnvironment = this.batchTableEnv;
        tableEnvironment.executeSql(TestConfigurations.sql("t1").option(FlinkOptions.PATH, this.tempFile.getAbsolutePath()).end());
        execInsertSql(tableEnvironment, TestSQL.INSERT_T1);
        TestData.assertRowsEquals((List<Row>) CollectionUtil.iterableToList(() -> {
            return tableEnvironment.sqlQuery("select * from t1 where uuid > 'id2' and age > 30 and ts > '1970-01-01 00:00:04'").execute().collect();
        }), "[+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], +I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
    }

    private static Stream<Arguments> executionModeAndTableTypeParams() {
        return Stream.of(new Object[]{ExecMode.BATCH, HoodieTableType.MERGE_ON_READ}, new Object[]{ExecMode.BATCH, HoodieTableType.COPY_ON_WRITE}, new Object[]{ExecMode.STREAM, HoodieTableType.MERGE_ON_READ}, new Object[]{ExecMode.STREAM, HoodieTableType.COPY_ON_WRITE}).map(Arguments::of);
    }

    private static Stream<Arguments> executionModeAndPartitioningParams() {
        return Stream.of(new Object[]{ExecMode.BATCH, false}, new Object[]{ExecMode.BATCH, true}, new Object[]{ExecMode.STREAM, false}, new Object[]{ExecMode.STREAM, true}).map(Arguments::of);
    }

    private static Stream<Arguments> tableTypeAndPartitioningParams() {
        return Stream.of(new Object[]{HoodieTableType.COPY_ON_WRITE, false}, new Object[]{HoodieTableType.COPY_ON_WRITE, true}, new Object[]{HoodieTableType.MERGE_ON_READ, false}, new Object[]{HoodieTableType.MERGE_ON_READ, true}).map(Arguments::of);
    }

    private static Stream<Arguments> tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() {
        return Arrays.stream(new Object[]{new Object[]{HoodieTableType.COPY_ON_WRITE, "incremental", 1, 1}, new Object[]{HoodieTableType.COPY_ON_WRITE, "read_optimized", 1, 1}, new Object[]{HoodieTableType.MERGE_ON_READ, "snapshot", 1, 1}, new Object[]{HoodieTableType.MERGE_ON_READ, "snapshot", 1, 3}, new Object[]{HoodieTableType.MERGE_ON_READ, "snapshot", 3, 2}}).map(Arguments::of);
    }

    private static Stream<Arguments> indexAndPartitioningParams() {
        return Stream.of(new Object[]{"FLINK_STATE", false}, new Object[]{"FLINK_STATE", true}, new Object[]{"BUCKET", false}, new Object[]{"BUCKET", true}).map(Arguments::of);
    }

    private static Stream<Arguments> indexAndTableTypeParams() {
        return Stream.of(new Object[]{"FLINK_STATE", HoodieTableType.COPY_ON_WRITE}, new Object[]{"FLINK_STATE", HoodieTableType.MERGE_ON_READ}, new Object[]{"BUCKET", HoodieTableType.COPY_ON_WRITE}, new Object[]{"BUCKET", HoodieTableType.MERGE_ON_READ}).map(Arguments::of);
    }

    private void execInsertSql(TableEnvironment tableEnvironment, String str) {
        try {
            tableEnvironment.executeSql(str).await();
        } catch (InterruptedException | ExecutionException e) {
        }
    }

    private List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, ExecMode execMode) throws TableNotExistException, InterruptedException {
        String[] split = str.split(" ");
        String str2 = split[split.length - 1];
        switch (execMode) {
            case STREAM:
                return execSelectSql(tableEnvironment, str, 10L, str2);
            case BATCH:
                return CollectionUtil.iterableToList(() -> {
                    return tableEnvironment.sqlQuery("select * from " + str2).execute().collect();
                });
            default:
                throw new AssertionError();
        }
    }

    private List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, long j) throws InterruptedException, TableNotExistException {
        return execSelectSql(tableEnvironment, str, j, (String) null);
    }

    private List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, long j, String str2) throws InterruptedException, TableNotExistException {
        String collectSinkDDL;
        if (str2 != null) {
            collectSinkDDL = TestConfigurations.getCollectSinkDDL("sink", ((Catalog) tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog()).get()).getTable(new ObjectPath(tableEnvironment.getCurrentDatabase(), str2)).getSchema());
        } else {
            collectSinkDDL = TestConfigurations.getCollectSinkDDL("sink");
        }
        return execSelectSql(tableEnvironment, str, collectSinkDDL, j);
    }

    private List<Row> execSelectSql(TableEnvironment tableEnvironment, String str, String str2, long j) throws InterruptedException {
        return fetchResult(tableEnvironment, submitSelectSql(tableEnvironment, str, str2), j);
    }

    private TableResult submitSelectSql(TableEnvironment tableEnvironment, String str, String str2) {
        tableEnvironment.executeSql("DROP TABLE IF EXISTS sink");
        tableEnvironment.executeSql(str2);
        return tableEnvironment.executeSql("insert into sink " + str);
    }

    private List<Row> fetchResult(TableEnvironment tableEnvironment, TableResult tableResult, long j) throws InterruptedException {
        TimeUnit.SECONDS.sleep(j);
        tableResult.getJobClient().ifPresent((v0) -> {
            v0.cancel();
        });
        tableEnvironment.executeSql("DROP TABLE IF EXISTS sink");
        return (List) CollectSinkTableFactory.RESULT.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }
}
