package org.apache.hudi.table.format;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/table/format/TestInputFormat.class */
public class TestInputFormat {
    private HoodieTableSource tableSource;
    private Configuration conf;

    @TempDir
    File tempFile;

    void beforeEach(HoodieTableType hoodieTableType) throws IOException {
        beforeEach(hoodieTableType, Collections.emptyMap());
    }

    void beforeEach(HoodieTableType hoodieTableType, Map<String, String> map) throws IOException {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, hoodieTableType.name());
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        map.forEach((str, str2) -> {
            this.conf.setString(str, str2);
        });
        StreamerUtil.initTableIfNotExists(this.conf);
        this.tableSource = getTableSource(this.conf);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testRead(HoodieTableType hoodieTableType) throws Exception {
        beforeEach(hoodieTableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.tableSource.reset();
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4]]"));
    }

    @Test
    void testReadBaseAndLogFiles() throws Exception {
        beforeEach(HoodieTableType.MERGE_ON_READ);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], +I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], +I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], +I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], +I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4], +I[id12, Monica, 27, 1970-01-01T00:00:00.009, par5], +I[id13, Phoebe, 31, 1970-01-01T00:00:00.010, par5], +I[id14, Rachel, 52, 1970-01-01T00:00:00.011, par6], +I[id15, Ross, 29, 1970-01-01T00:00:00.012, par6]]"));
    }

    @Test
    void testReadBaseAndLogFilesWithDeletes() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"));
        this.tableSource.reset();
        MergeOnReadInputFormat inputFormat2 = this.tableSource.getInputFormat();
        inputFormat2.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, z);
        TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        List<RowData> readData = readData(inputFormat);
        String str = "[+" + (z ? "I" : "U") + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
        MatcherAssert.assertThat(TestData.rowDataToString(readData), CoreMatchers.is(str));
        this.tableSource.reset();
        MergeOnReadInputFormat inputFormat2 = this.tableSource.getInputFormat();
        inputFormat2.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is(str));
    }

    @Test
    void testReadWithDeletesMOR() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        MergeOnReadInputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        inputFormat.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], -D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]"));
    }

    @Test
    void testReadWithDeletesCOW() throws Exception {
        beforeEach(HoodieTableType.COPY_ON_WRITE);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(CopyOnWriteInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]"));
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CDC_ENABLED.key(), "true");
        hashMap.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), hoodieCDCSupplementalLoggingMode.name());
        beforeEach(HoodieTableType.COPY_ON_WRITE, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(CdcInputFormat.class));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath(createMetaClient.getBasePathV2())).skipCompaction(false).build().inputSplits(createMetaClient, (String) null, (String) null, true).getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is("[-U[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], -U[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], +U[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], -D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], -D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3]]"));
    }

    @EnumSource(HoodieCDCSupplementalLoggingMode.class)
    @ParameterizedTest
    void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CDC_ENABLED.key(), "true");
        hashMap.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), hoodieCDCSupplementalLoggingMode.name());
        hashMap.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
        beforeEach(HoodieTableType.COPY_ON_WRITE, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(CdcInputFormat.class));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath(createMetaClient.getBasePathV2())).partitionPruner(PartitionPruners.getInstance(new String[]{"par1", "par2", "par3", "par4"})).skipCompaction(false).build().inputSplits(createMetaClient, (String) null, (String) null, true).getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is("[+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], +I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"));
    }

    @Test
    void testReadSkipCompaction() throws Exception {
        beforeEach(HoodieTableType.MERGE_ON_READ);
        HadoopConfigurations.getHadoopConf(this.conf);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.conf);
        IncrementalInputSplits build = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath(createMetaClient.getBasePathV2())).partitionPruner(PartitionPruners.getInstance(new String[]{"par1", "par2", "par3", "par4"})).skipCompaction(true).build();
        IncrementalInputSplits.Result inputSplits = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) inputSplits.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, TestUtils.getNthCompleteInstant(createMetaClient.getBasePath(), 0, "commit"));
        IncrementalInputSplits.Result inputSplits2 = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits2.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) inputSplits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT)));
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        InputFormat inputFormat2 = this.tableSource.getInputFormat(true);
        IncrementalInputSplits.Result inputSplits3 = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits3.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2, (InputSplit[]) inputSplits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT)));
    }

    @Test
    void testReadSkipClustering() throws Exception {
        beforeEach(HoodieTableType.COPY_ON_WRITE);
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.conf);
        IncrementalInputSplits build = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath(createMetaClient.getBasePathV2())).partitionPruner(PartitionPruners.getInstance(new String[]{"par1", "par2", "par3", "par4"})).skipClustering(true).build();
        IncrementalInputSplits.Result inputSplits = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) inputSplits.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, TestUtils.getNthCompleteInstant(createMetaClient.getBasePath(), 0, "replacecommit"));
        IncrementalInputSplits.Result inputSplits2 = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits2.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) inputSplits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT)));
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, this.conf);
        this.tableSource.reset();
        InputFormat inputFormat2 = this.tableSource.getInputFormat(true);
        IncrementalInputSplits.Result inputSplits3 = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits3.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2, (InputSplit[]) inputSplits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT)));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadHollowInstants(HoodieTableType hoodieTableType) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("hoodie.parquet.small.file.limit", "0");
        beforeEach(hoodieTableType, hashMap);
        for (int i = 0; i < 8; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(this.conf));
        List instants = createMetaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat(Integer.valueOf(instants.size()), CoreMatchers.is(4));
        ArrayList arrayList = new ArrayList();
        for (int i2 = 1; i2 <= 2; i2++) {
            arrayList.add(TestUtils.deleteInstantFile(createMetaClient, (HoodieInstant) instants.get(i2)));
        }
        List instants2 = createMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat(Integer.valueOf(instants2.size()), CoreMatchers.is(2));
        String timestamp = ((HoodieInstant) instants2.get(1)).getTimestamp();
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        IncrementalInputSplits build = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath(createMetaClient.getBasePathV2())).build();
        IncrementalInputSplits.Result inputSplits = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits.isEmpty());
        TestData.assertRowDataEquals(readData(inputFormat, (InputSplit[]) inputSplits.getInputSplits().toArray(new MergeOnReadInputSplit[0])), TestData.dataSetInsert(7, 8));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        IncrementalInputSplits.Result inputSplits2 = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits2.isEmpty());
        TestData.assertRowDataEquals(readData(inputFormat, (InputSplit[]) inputSplits2.getInputSplits().toArray(new MergeOnReadInputSplit[0])), TestData.dataSetInsert(1, 2, 7, 8));
        TestUtils.saveInstantAsComplete(createMetaClient, (HoodieInstant) instants.get(1), (HoodieCommitMetadata) arrayList.get(0));
        MatcherAssert.assertThat(inputSplits2.getEndInstant(), CoreMatchers.is(timestamp));
        IncrementalInputSplits.Result inputSplits3 = build.inputSplits(createMetaClient, inputSplits2.getEndInstant(), inputSplits2.getOffset(), false);
        Assertions.assertFalse(inputSplits3.isEmpty());
        TestData.assertRowDataEquals(readData(inputFormat, (InputSplit[]) inputSplits3.getInputSplits().toArray(new MergeOnReadInputSplit[0])), TestData.dataSetInsert(3, 4));
        IncrementalInputSplits.Result inputSplits4 = build.inputSplits(createMetaClient, ((HoodieInstant) instants.get(0)).getTimestamp(), ((HoodieInstant) instants.get(0)).getStateTransitionTime(), false);
        Assertions.assertFalse(inputSplits4.isEmpty());
        TestData.assertRowDataEquals(readData(inputFormat, (InputSplit[]) inputSplits4.getInputSplits().toArray(new MergeOnReadInputSplit[0])), TestData.dataSetInsert(3, 4, 7, 8));
        TestUtils.saveInstantAsComplete(createMetaClient, (HoodieInstant) instants.get(2), (HoodieCommitMetadata) arrayList.get(1));
        MatcherAssert.assertThat(inputSplits3.getEndInstant(), CoreMatchers.is(timestamp));
        IncrementalInputSplits.Result inputSplits5 = build.inputSplits(createMetaClient, inputSplits3.getEndInstant(), inputSplits3.getOffset(), false);
        Assertions.assertFalse(inputSplits5.isEmpty());
        TestData.assertRowDataEquals(readData(inputFormat, (InputSplit[]) inputSplits5.getInputSplits().toArray(new MergeOnReadInputSplit[0])), TestData.dataSetInsert(5, 6));
        MatcherAssert.assertThat(inputSplits5.getEndInstant(), CoreMatchers.is(timestamp));
        Assertions.assertTrue(build.inputSplits(createMetaClient, inputSplits5.getEndInstant(), inputSplits5.getOffset(), false).isEmpty());
        IncrementalInputSplits.Result inputSplits6 = build.inputSplits(createMetaClient, ((HoodieInstant) instants.get(2)).getTimestamp(), ((HoodieInstant) instants.get(3)).getStateTransitionTime(), false);
        Assertions.assertFalse(inputSplits6.isEmpty());
        TestData.assertRowDataEquals(readData(inputFormat, (InputSplit[]) inputSplits6.getInputSplits().toArray(new MergeOnReadInputSplit[0])), TestData.dataSetInsert(3, 4, 7, 8));
    }

    @Test
    void testReadBaseFilesWithStartCommit() throws Exception {
        beforeEach(HoodieTableType.COPY_ON_WRITE);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat(true);
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.conf);
        IncrementalInputSplits build = IncrementalInputSplits.builder().rowType(TestConfigurations.ROW_TYPE).conf(this.conf).path(FilePathUtils.toFlinkPath(createMetaClient.getBasePathV2())).partitionPruner(PartitionPruners.getInstance(new String[]{"par1", "par2", "par3", "par4"})).build();
        IncrementalInputSplits.Result inputSplits = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) inputSplits.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_INSERT)));
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, TestUtils.getNthCompleteInstant(createMetaClient.getBasePath(), 1, "commit"));
        IncrementalInputSplits.Result inputSplits2 = build.inputSplits(createMetaClient, (String) null, (String) null, false);
        Assertions.assertFalse(inputSplits2.isEmpty());
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat, (InputSplit[]) inputSplits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]))), CoreMatchers.is(TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT)));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadWithPartitionPrune(HoodieTableType hoodieTableType) throws Exception {
        beforeEach(hoodieTableType);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.tableSource.applyFilters(Arrays.asList(new CallExpression(BuiltInFunctionDefinitions.EQUALS, Arrays.asList(new FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4), new ValueLiteralExpression("par1", DataTypes.STRING().notNull())), DataTypes.BOOLEAN())));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]]"));
    }

    @Test
    void testReadChangesMergedMOR() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[]"));
        this.tableSource.reset();
        MergeOnReadInputFormat inputFormat2 = this.tableSource.getInputFormat();
        inputFormat2.isEmitDelete(true);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is("[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @Test
    void testReadChangesUnMergedMOR() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
        hashMap.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], -U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], +U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], -U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], +U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], -U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], +U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], -D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadIncrementally(HoodieTableType hoodieTableType) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        beforeEach(hoodieTableType, hashMap);
        for (int i = 0; i < 6; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        List list = (List) StreamerUtil.createMetaClient(this.tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(this.conf)).getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(3));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat), TestData.dataSetInsert(3, 4, 5, 6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat2, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat2), TestData.dataSetInsert(1, 2, 3, 4, 5, 6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat3, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat3), TestData.dataSetInsert(1, 2, 3, 4));
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat4, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat4), TestData.dataSetInsert(3, 4));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "000");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat5 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat4, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat5), TestData.dataSetInsert(1, 2, 3, 4));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "001");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "002");
        this.tableSource = getTableSource(this.conf);
        TestData.assertRowDataEquals(readData(this.tableSource.getInputFormat()), (List<RowData>) Collections.emptyList());
    }

    @Test
    void testReadChangelogIncrementally() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        hashMap.put(FlinkOptions.CDC_ENABLED.key(), "true");
        hashMap.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
        beforeEach(HoodieTableType.COPY_ON_WRITE, hashMap);
        for (int i = 0; i < 3; i++) {
            TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), this.conf);
        }
        List list = (List) StreamerUtil.createMetaClient(this.tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(this.conf)).getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(3));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(CdcInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat), TestData.dataSetUpsert(2, 1, 2, 1));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat2, CoreMatchers.instanceOf(CdcInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat2), TestData.dataSetInsert(1, 2));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat3, CoreMatchers.instanceOf(CdcInputFormat.class));
        List<RowData> readData = readData(inputFormat3);
        ArrayList arrayList = new ArrayList(TestData.dataSetInsert(1));
        arrayList.addAll(TestData.dataSetUpsert(1));
        arrayList.addAll(TestData.dataSetInsert(2));
        arrayList.addAll(TestData.dataSetUpsert(2));
        TestData.assertRowDataEquals(readData, arrayList);
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat4 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat4, CoreMatchers.instanceOf(CdcInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat4), TestData.dataSetUpsert(2, 1));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "000");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat5 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat5, CoreMatchers.instanceOf(CdcInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat5), TestData.dataSetInsert(1, 2));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "001");
        this.conf.setString(FlinkOptions.READ_END_COMMIT, "002");
        this.tableSource = getTableSource(this.conf);
        TestData.assertRowDataEquals(readData(this.tableSource.getInputFormat()), (List<RowData>) Collections.emptyList());
    }

    @Test
    void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), EventTimeAvroPayload.class.getName());
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"));
        this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        this.tableSource.reset();
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat)), CoreMatchers.is("[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"));
        TestData.writeData(TestData.DATA_SET_SINGLE_DELETE, this.conf);
        this.tableSource.reset();
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat2, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        MatcherAssert.assertThat(TestData.rowDataToString(readData(inputFormat2)), CoreMatchers.is("[]"));
    }

    @MethodSource({"preCombiningAndChangelogModeParams"})
    @ParameterizedTest
    void testMergeOnReadDisorderDeleteMerging(boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PRE_COMBINE.key(), z + "");
        hashMap.put(FlinkOptions.CHANGELOG_ENABLED.key(), z2 + "");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        TestData.writeData(TestData.DATA_SET_DISORDER_INSERT_DELETE, this.conf);
        MatcherAssert.assertThat(TestData.rowDataToString(readData(this.tableSource.getInputFormat())), CoreMatchers.is("[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]"));
    }

    @Test
    void testReadArchivedCommitsIncrementally() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.QUERY_TYPE.key(), "incremental");
        hashMap.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
        hashMap.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
        hashMap.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
        hashMap.put(FlinkOptions.METADATA_ENABLED.key(), "false");
        hashMap.put("hoodie.commits.archival.batch", "1");
        beforeEach(HoodieTableType.COPY_ON_WRITE, hashMap);
        for (int i = 0; i < 20; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        HoodieFlinkWriteClient hoodieFlinkWriteClient = new HoodieFlinkWriteClient(HoodieFlinkEngineContext.DEFAULT, FlinkWriteClients.getHoodieClientConfig(this.conf));
        hoodieFlinkWriteClient.clean();
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(this.conf));
        List list = (List) createMetaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(4));
        List list2 = (List) createMetaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list2.size()), CoreMatchers.is(6));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list2.get(0));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list2.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat), TestData.dataSetInsert(1, 2, 3, 4));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list2.get(1));
        this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat2 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat2, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat2), TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
        this.conf.removeConfig(FlinkOptions.READ_START_COMMIT);
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list2.get(1));
        this.tableSource = getTableSource(this.conf);
        InputFormat inputFormat3 = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat3, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat3), TestData.dataSetInsert(3, 4));
        this.conf.setString(FlinkOptions.READ_START_COMMIT, (String) list2.get(1));
        this.conf.setString(FlinkOptions.READ_END_COMMIT, (String) list.get(0));
        this.tableSource = getTableSource(this.conf);
        TestData.assertRowDataEquals(readData(this.tableSource.getInputFormat()), (List<RowData>) Collections.emptyList());
        hoodieFlinkWriteClient.close();
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testReadWithWiderSchema(HoodieTableType hoodieTableType) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
        beforeEach(hoodieTableType, hashMap);
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.assertRowDataEquals(readData(this.tableSource.getInputFormat()), TestData.DATA_SET_INSERT);
    }

    @Test
    void testReadMORWithCompactionPlanScheduled() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
        hashMap.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
        beforeEach(HoodieTableType.MERGE_ON_READ, hashMap);
        for (int i = 0; i < 6; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        InputFormat inputFormat = this.tableSource.getInputFormat();
        MatcherAssert.assertThat(inputFormat, CoreMatchers.instanceOf(MergeOnReadInputFormat.class));
        TestData.assertRowDataEquals(readData(inputFormat), TestData.dataSetInsert(1, 2, 3, 4, 5, 6));
    }

    private static Stream<Arguments> preCombiningAndChangelogModeParams() {
        return Stream.of(new Object[]{true, true}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{false, false}).map(Arguments::of);
    }

    private HoodieTableSource getTableSource(Configuration configuration) {
        return new HoodieTableSource(TestConfigurations.TABLE_SCHEMA, new Path(this.tempFile.getAbsolutePath()), Collections.singletonList("partition"), "default", configuration);
    }

    private static List<RowData> readData(InputFormat inputFormat) throws IOException {
        return readData(inputFormat, inputFormat.createInputSplits(1));
    }

    private static List<RowData> readData(InputFormat inputFormat, InputSplit[] inputSplitArr) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (InputSplit inputSplit : inputSplitArr) {
            inputFormat.open(inputSplit);
            while (!inputFormat.reachedEnd()) {
                arrayList.add(TestConfigurations.SERIALIZER.copy((RowData) inputFormat.nextRecord((Object) null)));
            }
            inputFormat.close();
        }
        return arrayList;
    }
}
