/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestInputFormat {
    private HoodieTableSource tableSource;
    private Configuration conf;
    @TempDir
    File tempFile;

    void beforeEach(HoodieTableType tableType) throws IOException {
        this.beforeEach(tableType, Collections.emptyMap());
    }

    void beforeEach(HoodieTableType tableType, Map<String, String> options) throws IOException {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
        if (!this.conf.contains(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
            this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        options.forEach((key, value) -> this.conf.setString(key, value));
        StreamerUtil.initTableIfNotExists((Configuration)this.conf);
        this.tableSource = this.getTableSource(this.conf);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testRead(HoodieTableType tableType) throws Exception {
        this.beforeEach(tableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        result = TestInputFormat.readData(inputFormat);
        actual = TestData.rowDataToString(result);
        expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadBaseAndLogFiles() throws Exception {
        this.beforeEach(HoodieTableType.MERGE_ON_READ);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        result = TestInputFormat.readData(inputFormat);
        actual = TestData.rowDataToString(result);
        expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4], +I[id12, Monica, 27, 1970-01-01T00:00:00.009, par5], +I[id13, Phoebe, 31, 1970-01-01T00:00:00.010, par5], +I[id14, Rachel, 52, 1970-01-01T00:00:00.011, par6], +I[id15, Ross, 29, 1970-01-01T00:00:00.012, par6]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadBaseAndLogFilesWithDeletes() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result1 = TestInputFormat.readData(inputFormat);
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"));
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result2 = TestInputFormat.readData(inputFormat);
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]";
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean compact) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, compact);
        TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result1 = TestInputFormat.readData(inputFormat);
        String rowKind = compact ? "I" : "U";
        String expected = "[+" + rowKind + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        String actual1 = TestData.rowDataToString(result1);
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)expected));
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result2 = TestInputFormat.readData(inputFormat);
        String actual2 = TestData.rowDataToString(result2);
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadWithDeletesMOR() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @Test
    void testReadWithDeletesCOW() throws Exception {
        this.beforeEach(HoodieTableType.COPY_ON_WRITE);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(CopyOnWriteInputFormat.class));
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CDC_ENABLED.key(), "true");
        options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
        this.beforeEach(HoodieTableType.COPY_ON_WRITE, options);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).skipCompaction(false).build();
        IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, true);
        List<RowData> result = TestInputFormat.readData(inputFormat, (InputSplit[])splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual = TestData.rowDataToString(result);
        String expected = "[-U[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], -U[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], +U[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[-U[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], -U[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], +U[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3]]"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CDC_ENABLED.key(), "true");
        options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
        options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
        this.beforeEach(HoodieTableType.COPY_ON_WRITE, options);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1", "par2", "par3", "par4")).build()).skipCompaction(false).build();
        IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, true);
        List<RowData> result = TestInputFormat.readData(inputFormat, (InputSplit[])splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"));
    }

    @Test
    void testReadSkipCompaction() throws Exception {
        this.beforeEach(HoodieTableType.MERGE_ON_READ);
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf((Configuration)this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1", "par2", "par3", "par4")).build()).skipCompaction(true).build();
        IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits1.isEmpty());
        List<RowData> result1 = TestInputFormat.readData(inputFormat, (InputSplit[])splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)expected1));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, "commit");
        this.conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
        IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits2.isEmpty());
        List<RowData> result2 = TestInputFormat.readData(inputFormat, (InputSplit[])splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)expected2));
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat(true);
        IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits3.isEmpty());
        List<RowData> result3 = TestInputFormat.readData(inputFormat, (InputSplit[])splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual3 = TestData.rowDataToString(result3);
        String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
        MatcherAssert.assertThat((Object)actual3, (Matcher)CoreMatchers.is((Object)expected3));
    }

    @Test
    void testReadSkipClustering() throws Exception {
        this.beforeEach(HoodieTableType.COPY_ON_WRITE);
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1", "par2", "par3", "par4")).build()).skipClustering(true).build();
        IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits1.isEmpty());
        List<RowData> result1 = TestInputFormat.readData(inputFormat, (InputSplit[])splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)expected1));
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, "replacecommit");
        this.conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
        IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits2.isEmpty());
        List<RowData> result2 = TestInputFormat.readData(inputFormat, (InputSplit[])splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)expected2));
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat(true);
        IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits3.isEmpty());
        List<RowData> result3 = TestInputFormat.readData(inputFormat, (InputSplit[])splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual3 = TestData.rowDataToString(result3);
        String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
        MatcherAssert.assertThat((Object)actual3, (Matcher)CoreMatchers.is((Object)expected3));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadHollowInstants(HoodieTableType tableType) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("hoodie.parquet.small.file.limit", "0");
        this.beforeEach(tableType, options);
        for (int i = 0; i < 8; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        List oriInstants = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat((Object)oriInstants.size(), (Matcher)CoreMatchers.is((Object)4));
        ArrayList<HoodieCommitMetadata> metadataList = new ArrayList<HoodieCommitMetadata>();
        for (int i = 1; i <= 2; ++i) {
            HoodieInstant instant = (HoodieInstant)oriInstants.get(i);
            metadataList.add(TestUtils.deleteInstantFile(metaClient, instant));
        }
        List instants = metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat((Object)instants.size(), (Matcher)CoreMatchers.is((Object)2));
        String c2 = ((HoodieInstant)oriInstants.get(1)).requestedTime();
        String c3 = ((HoodieInstant)oriInstants.get(2)).requestedTime();
        String c4 = ((HoodieInstant)oriInstants.get(3)).requestedTime();
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).build();
        IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits1.isEmpty());
        List<RowData> result1 = TestInputFormat.readData(inputFormat, (InputSplit[])splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result1, TestData.dataSetInsert(7, 8));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits2.isEmpty());
        List<RowData> result2 = TestInputFormat.readData(inputFormat, (InputSplit[])splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2, 7, 8));
        TestUtils.saveInstantAsComplete(metaClient, (HoodieInstant)oriInstants.get(1), (HoodieCommitMetadata)metadataList.get(0));
        MatcherAssert.assertThat((Object)splits2.getEndInstant(), (Matcher)CoreMatchers.is((Object)c4));
        IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, splits2.getOffset(), false);
        Assertions.assertFalse((boolean)splits3.isEmpty());
        List<RowData> result3 = TestInputFormat.readData(inputFormat, (InputSplit[])splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result3, TestData.dataSetInsert(3, 4));
        IncrementalInputSplits.Result splits4 = incrementalInputSplits.inputSplits(metaClient, ((HoodieInstant)oriInstants.get(0)).getCompletionTime(), false);
        Assertions.assertFalse((boolean)splits4.isEmpty());
        List<RowData> result4 = TestInputFormat.readData(inputFormat, (InputSplit[])splits4.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result4, TestData.dataSetInsert(3, 4, 7, 8));
        TestUtils.saveInstantAsComplete(metaClient, (HoodieInstant)oriInstants.get(2), (HoodieCommitMetadata)metadataList.get(1));
        MatcherAssert.assertThat((Object)splits3.getEndInstant(), (Matcher)CoreMatchers.is((Object)c2));
        IncrementalInputSplits.Result splits5 = incrementalInputSplits.inputSplits(metaClient, splits3.getOffset(), false);
        Assertions.assertFalse((boolean)splits5.isEmpty());
        List<RowData> result5 = TestInputFormat.readData(inputFormat, (InputSplit[])splits5.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result5, TestData.dataSetInsert(5, 6));
        MatcherAssert.assertThat((Object)splits5.getEndInstant(), (Matcher)CoreMatchers.is((Object)c3));
        IncrementalInputSplits.Result splits6 = incrementalInputSplits.inputSplits(metaClient, splits5.getOffset(), false);
        Assertions.assertTrue((boolean)splits6.isEmpty());
        IncrementalInputSplits.Result splits7 = incrementalInputSplits.inputSplits(metaClient, ((HoodieInstant)oriInstants.get(3)).getCompletionTime(), false);
        Assertions.assertFalse((boolean)splits7.isEmpty());
        List<RowData> result7 = TestInputFormat.readData(inputFormat, (InputSplit[])splits7.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 5, 6));
    }

    @Test
    void testReadBaseFilesWithStartCommit() throws Exception {
        this.beforeEach(HoodieTableType.COPY_ON_WRITE);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1", "par2", "par3", "par4")).build()).build();
        IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits1.isEmpty());
        List<RowData> result1 = TestInputFormat.readData(inputFormat, (InputSplit[])splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)expected1));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1, "commit");
        this.conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
        IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits2.isEmpty());
        List<RowData> result2 = TestInputFormat.readData(inputFormat, (InputSplit[])splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)expected2));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
        this.beforeEach(tableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        FieldReferenceExpression partRef = new FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4);
        ValueLiteralExpression partLiteral = new ValueLiteralExpression((Object)"par1", (DataType)DataTypes.STRING().notNull());
        CallExpression partFilter = new CallExpression((FunctionDefinition)BuiltInFunctionDefinitions.EQUALS, Arrays.asList(partRef, partLiteral), DataTypes.BOOLEAN());
        this.tableSource.applyFilters(Arrays.asList(partFilter));
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadChangesMergedMOR() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result1 = TestInputFormat.readData(inputFormat);
        String actual1 = TestData.rowDataToString(result1);
        String expected1 = "[]";
        MatcherAssert.assertThat((Object)actual1, (Matcher)CoreMatchers.is((Object)"[]"));
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        ((MergeOnReadInputFormat)inputFormat).isEmitDelete(true);
        List<RowData> result2 = TestInputFormat.readData(inputFormat);
        String actual2 = TestData.rowDataToString(result2);
        String expected2 = "[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
        MatcherAssert.assertThat((Object)actual2, (Matcher)CoreMatchers.is((Object)"[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @Test
    void testReadChangesUnMergedMOR() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> result = TestInputFormat.readData(inputFormat);
        String actual = TestData.rowDataToString(result);
        String expected = "[+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], -U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], -U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], +U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], -U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], +U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], -D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
        MatcherAssert.assertThat((Object)actual, (Matcher)CoreMatchers.is((Object)"[+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], -U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], -U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], +U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], -U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], +U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], -D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadIncrementally(HoodieTableType tableType) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        this.beforeEach(tableType, options);
        for (int i = 0; i < 6; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)this.conf)), (String)this.tempFile.getAbsolutePath());
        List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)3));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual1 = TestInputFormat.readData(inputFormat1);
        List<RowData> expected1 = TestData.dataSetInsert(3, 4, 5, 6);
        TestData.assertRowDataEquals(actual1, expected1);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat2, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual2 = TestInputFormat.readData(inputFormat2);
        List<RowData> expected2 = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
        TestData.assertRowDataEquals(actual2, expected2);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)commits.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat3, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual3 = TestInputFormat.readData(inputFormat3);
        List<RowData> expected3 = TestData.dataSetInsert(1, 2, 3, 4);
        TestData.assertRowDataEquals(actual3, expected3);
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat4, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual4 = TestInputFormat.readData(inputFormat4);
        List<RowData> expected4 = TestData.dataSetInsert(3, 4);
        TestData.assertRowDataEquals(actual4, expected4);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "000");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat5 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat4, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual5 = TestInputFormat.readData(inputFormat5);
        List<RowData> expected5 = TestData.dataSetInsert(1, 2, 3, 4);
        TestData.assertRowDataEquals(actual5, expected5);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "001");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "002");
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat6 = this.tableSource.getInputFormat();
        List<RowData> actual6 = TestInputFormat.readData(inputFormat6);
        TestData.assertRowDataEquals(actual6, Collections.emptyList());
    }

    @Test
    void testReadChangelogIncrementally() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        options.put(FlinkOptions.CDC_ENABLED.key(), "true");
        options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.COPY_ON_WRITE, options);
        for (int i = 0; i < 3; ++i) {
            List<RowData> dataset = TestData.dataSetInsert(1, 2);
            TestData.writeDataAsBatch(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)this.conf)), (String)this.tempFile.getAbsolutePath());
        List<String> commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)3));
        this.testReadChangelogInternal(commits);
    }

    @Test
    void testReadChangelogIncrementallyForMor() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        options.put(FlinkOptions.CDC_ENABLED.key(), "true");
        options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
        options.put(FlinkOptions.READ_CDC_FROM_CHANGELOG.key(), "false");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        for (int i = 0; i < 3; ++i) {
            List<RowData> dataset = TestData.dataSetInsert(1, 2);
            TestData.writeDataAsBatch(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)this.conf)), (String)this.tempFile.getAbsolutePath());
        List<String> commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)3));
        this.testReadChangelogInternal(commits);
    }

    @Test
    void testReadChangelogIncrementallyForMorWithCompaction() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        options.put(FlinkOptions.CDC_ENABLED.key(), "true");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "true");
        options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
        options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        for (int i = 0; i < 3; ++i) {
            List<RowData> dataset = TestData.dataSetInsert(1, 2);
            TestData.writeDataAsBatch(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)this.conf)), (String)this.tempFile.getAbsolutePath());
        List<String> commits = metaClient.getCommitsTimeline().filterCompletedInstants().filter(instant -> instant.getAction().equals("commit")).getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)3));
        this.testReadChangelogInternal(commits);
    }

    private void testReadChangelogInternal(List<String> commits) throws IOException {
        this.conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        List<RowData> actual1 = TestInputFormat.readData(inputFormat1);
        List<RowData> expected1 = TestData.dataSetUpsert(2, 1, 2, 1);
        TestData.assertRowDataEquals(actual1, expected1);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat2, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        List<RowData> actual2 = TestInputFormat.readData(inputFormat2);
        List<RowData> expected2 = TestData.dataSetInsert(1, 2);
        TestData.assertRowDataEquals(actual2, expected2);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat3, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        List<RowData> actual3 = TestInputFormat.readData(inputFormat3);
        ArrayList<RowData> expected3 = new ArrayList<RowData>(TestData.dataSetInsert(1));
        expected3.addAll(TestData.dataSetUpsert(1));
        expected3.addAll(TestData.dataSetInsert(2));
        expected3.addAll(TestData.dataSetUpsert(2));
        TestData.assertRowDataEquals(actual3, expected3);
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat4, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        List<RowData> actual4 = TestInputFormat.readData(inputFormat4);
        List<RowData> expected4 = TestData.dataSetUpsert(2, 1);
        TestData.assertRowDataEquals(actual4, expected4);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "000");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat5 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat5, (Matcher)CoreMatchers.instanceOf(CdcInputFormat.class));
        List<RowData> actual5 = TestInputFormat.readData(inputFormat5);
        TestData.assertRowDataEquals(actual5, expected3);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "001");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "002");
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat6 = this.tableSource.getInputFormat();
        List<RowData> actual6 = TestInputFormat.readData(inputFormat6);
        TestData.assertRowDataEquals(actual6, Collections.emptyList());
    }

    @Test
    void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName());
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        String baseResult = TestData.rowDataToString(TestInputFormat.readData(inputFormat));
        String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        MatcherAssert.assertThat((Object)baseResult, (Matcher)CoreMatchers.is((Object)expected));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        String baseMergeLogFileResult = TestData.rowDataToString(TestInputFormat.readData(inputFormat));
        MatcherAssert.assertThat((Object)baseMergeLogFileResult, (Matcher)CoreMatchers.is((Object)expected));
        TestData.writeData(TestData.DATA_SET_SINGLE_DELETE, this.conf);
        this.tableSource.reset();
        inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        String baseMergeLogFileResult2 = TestData.rowDataToString(TestInputFormat.readData(inputFormat));
        MatcherAssert.assertThat((Object)baseMergeLogFileResult2, (Matcher)CoreMatchers.is((Object)"[]"));
    }

    @ParameterizedTest
    @MethodSource(value={"preCombiningAndChangelogModeParams"})
    void testMergeOnReadDisorderDeleteMerging(boolean preCombine, boolean changelogMode) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.PRE_COMBINE.key(), preCombine + "");
        options.put(FlinkOptions.CHANGELOG_ENABLED.key(), changelogMode + "");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        TestData.writeData(TestData.DATA_SET_DISORDER_INSERT_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        String baseResult = TestData.rowDataToString(TestInputFormat.readData(inputFormat));
        String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        MatcherAssert.assertThat((Object)baseResult, (Matcher)CoreMatchers.is((Object)expected));
    }

    @Test
    void testReadArchivedCommitsIncrementally() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
        options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
        options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
        options.put(FlinkOptions.METADATA_ENABLED.key(), "false");
        options.put("hoodie.commits.archival.batch", "1");
        this.beforeEach(HoodieTableType.COPY_ON_WRITE, options);
        for (int i = 0; i < 20; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient((HoodieEngineContext)HoodieFlinkEngineContext.DEFAULT, FlinkWriteClients.getHoodieClientConfig((Configuration)this.conf));
        writeClient.clean();
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf((Configuration)this.conf)), (String)this.tempFile.getAbsolutePath());
        List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)commits.size(), (Matcher)CoreMatchers.is((Object)4));
        List archivedCommits = metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
        MatcherAssert.assertThat((Object)archivedCommits.size(), (Matcher)CoreMatchers.is((Object)6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)archivedCommits.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)archivedCommits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual1 = TestInputFormat.readData(inputFormat1);
        List<RowData> expected1 = TestData.dataSetInsert(1, 2, 3, 4);
        TestData.assertRowDataEquals(actual1, expected1);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)archivedCommits.get(1));
        this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat2, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual2 = TestInputFormat.readData(inputFormat2);
        List<RowData> expected2 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
        TestData.assertRowDataEquals(actual2, expected2);
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)archivedCommits.get(1));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat3, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual3 = TestInputFormat.readData(inputFormat3);
        List<RowData> expected3 = TestData.dataSetInsert(3, 4);
        TestData.assertRowDataEquals(actual3, expected3);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String)archivedCommits.get(1));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String)commits.get(0));
        this.tableSource = this.getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        List<RowData> actual4 = TestInputFormat.readData(inputFormat4);
        TestData.assertRowDataEquals(actual4, Collections.emptyList());
        writeClient.close();
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema((LogicalType)TestConfigurations.ROW_TYPE_WIDER).toString());
        this.beforeEach(tableType, options);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        List<RowData> result = TestInputFormat.readData(inputFormat);
        TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT);
    }

    @Test
    void testReadMORWithCompactionPlanScheduled() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        for (int i = 0; i < 6; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        InputFormat inputFormat1 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat((Object)inputFormat1, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> actual = TestInputFormat.readData(inputFormat1);
        List<RowData> expected = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
        TestData.assertRowDataEquals(actual, expected);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testIncReadWithNonBlockingConcurrencyControl(boolean skipCompaction) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
        options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        this.beforeEach(HoodieTableType.MERGE_ON_READ, options);
        for (int i = 0; i < 6; i += 2) {
            List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
            TestData.writeData(dataset, this.conf);
        }
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient((Configuration)this.conf);
        Option firstCommit = metaClient.getActiveTimeline().filterCompletedInstants().firstInstant();
        Assertions.assertTrue((boolean)firstCommit.isPresent());
        MatcherAssert.assertThat((Object)((HoodieInstant)firstCommit.get()).getAction(), (Matcher)CoreMatchers.is((Object)"deltacommit"));
        Path metaFilePath = Paths.get(metaClient.getTimelinePath().toString(), HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName((HoodieInstant)firstCommit.get()));
        String newCompletionTime = TestUtils.amendCompletionTimeToLatest(metaClient, metaFilePath, ((HoodieInstant)firstCommit.get()).requestedTime());
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat((Object)inputFormat, (Matcher)CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath((StoragePath)metaClient.getBasePath())).skipCompaction(skipCompaction).build();
        this.conf.setString(FlinkOptions.READ_END_COMMIT, newCompletionTime);
        IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, null, false);
        Assertions.assertFalse((boolean)splits2.isEmpty());
        List<RowData> result2 = TestInputFormat.readData(inputFormat, (InputSplit[])splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
        TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2));
    }

    private static Stream<Arguments> preCombiningAndChangelogModeParams() {
        Object[][] data = new Object[][]{{true, true}, {true, false}, {false, true}, {false, false}};
        return Stream.of(data).map(Arguments::of);
    }

    private HoodieTableSource getTableSource(Configuration conf) {
        return new HoodieTableSource(SerializableSchema.create((ResolvedSchema)TestConfigurations.TABLE_SCHEMA), new StoragePath(this.tempFile.getAbsolutePath()), Collections.singletonList("partition"), "default", conf);
    }

    private static List<RowData> readData(InputFormat inputFormat) throws IOException {
        InputSplit[] inputSplits = inputFormat.createInputSplits(1);
        return TestInputFormat.readData(inputFormat, inputSplits);
    }

    private static List<RowData> readData(InputFormat inputFormat, InputSplit[] inputSplits) throws IOException {
        ArrayList<RowData> result = new ArrayList<RowData>();
        for (InputSplit inputSplit : inputSplits) {
            inputFormat.open(inputSplit);
            while (!inputFormat.reachedEnd()) {
                result.add(TestConfigurations.SERIALIZER.copy((RowData)inputFormat.nextRecord(null)));
            }
            inputFormat.close();
        }
        return result;
    }
}

