/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.types.DataType;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.ExpressionEvaluators;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

public class TestIncrementalInputSplits
extends HoodieCommonTestHarness {
    @BeforeEach
    void init() {
        this.initPath();
    }

    @Test
    void testFilterInstantsWithRange() throws IOException {
        Configuration conf = TestConfigurations.getDefaultConf(this.basePath);
        conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, (Object)true);
        conf.set(FlinkOptions.TABLE_TYPE, (Object)FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ);
        HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline();
        HoodieInstant commit1 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", "1");
        HoodieInstant commit2 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", "2");
        HoodieInstant commit3 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", "3");
        timeline.createCompleteInstant(commit1);
        timeline.createCompleteInstant(commit2);
        timeline.createCompleteInstant(commit3);
        timeline = this.metaClient.reloadActiveTimeline();
        Map<String, String> completionTimeMap = timeline.filterCompletedInstants().getInstantsAsStream().collect(Collectors.toMap(HoodieInstant::requestedTime, HoodieInstant::getCompletionTime));
        IncrementalQueryAnalyzer analyzer1 = IncrementalQueryAnalyzer.builder().metaClient(this.metaClient).rangeType(InstantRange.RangeType.OPEN_CLOSED).startCompletionTime(completionTimeMap.get("1")).skipClustering(true).build();
        List activeInstants1 = analyzer1.analyze().getActiveInstants();
        Assertions.assertEquals((int)2, (int)activeInstants1.size());
        Assertions.assertIterableEquals(Arrays.asList(commit2, commit3), (Iterable)activeInstants1);
        IncrementalQueryAnalyzer analyzer2 = IncrementalQueryAnalyzer.builder().metaClient(this.metaClient).rangeType(InstantRange.RangeType.CLOSED_CLOSED).skipClustering(true).build();
        List activeInstants2 = analyzer2.analyze().getActiveInstants();
        Assertions.assertEquals((int)1, (int)activeInstants2.size());
        Assertions.assertIterableEquals(Collections.singletonList(commit3), (Iterable)activeInstants2);
        IncrementalQueryAnalyzer analyzer3 = IncrementalQueryAnalyzer.builder().metaClient(this.metaClient).rangeType(InstantRange.RangeType.CLOSED_CLOSED).startCompletionTime(completionTimeMap.get("1")).endCompletionTime(completionTimeMap.get("3")).skipClustering(true).build();
        List activeInstants3 = analyzer3.analyze().getActiveInstants();
        Assertions.assertEquals((int)3, (int)activeInstants3.size());
        Assertions.assertIterableEquals(Arrays.asList(commit1, commit2, commit3), (Iterable)activeInstants3);
        HoodieInstant commit4 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "compaction", "4");
        timeline.createNewInstant(commit4);
        timeline = this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)4, (int)timeline.getInstants().size());
        List activeInstants4 = analyzer3.analyze().getActiveInstants();
        Assertions.assertEquals((int)3, (int)activeInstants4.size());
    }

    @Test
    void testFilterInstantsByConditionForMOR() throws IOException {
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ);
        HoodieActiveTimeline timelineMOR = this.metaClient.getActiveTimeline();
        HoodieInstant commit1 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", "1");
        timelineMOR.createCompleteInstant(commit1);
        HoodieInstant commit2 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "deltacommit", "2");
        timelineMOR.createCompleteInstant(commit2);
        HoodieInstant commit3 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "clustering", "3");
        timelineMOR.createNewInstant(commit3);
        commit3 = timelineMOR.transitionClusterRequestedToInflight(commit3, Option.empty());
        HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(new ArrayList(), new HashMap(), (Option)Option.empty(), (WriteOperationType)WriteOperationType.CLUSTER, (String)"", (String)"replacecommit");
        timelineMOR.transitionClusterInflightToComplete(true, HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitInflightInstant(commit3.requestedTime()), (HoodieReplaceCommitMetadata)commitMetadata);
        HoodieInstant commit4 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", "4");
        timelineMOR.createNewInstant(commit4);
        commit4 = timelineMOR.transitionReplaceRequestedToInflight(commit4, Option.empty());
        commitMetadata = CommitUtils.buildMetadata(new ArrayList(), new HashMap(), (Option)Option.empty(), (WriteOperationType)WriteOperationType.INSERT_OVERWRITE, (String)"", (String)"replacecommit");
        timelineMOR.transitionReplaceInflightToComplete(true, HoodieTestUtils.INSTANT_GENERATOR.getReplaceCommitInflightInstant(commit4.requestedTime()), (HoodieReplaceCommitMetadata)commitMetadata);
        HoodieInstant commit5 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", "5");
        timelineMOR.createNewInstant(commit5);
        commit5 = timelineMOR.transitionReplaceRequestedToInflight(commit5, Option.empty());
        commitMetadata = CommitUtils.buildMetadata(new ArrayList(), new HashMap(), (Option)Option.empty(), (WriteOperationType)WriteOperationType.INSERT_OVERWRITE_TABLE, (String)"", (String)"replacecommit");
        timelineMOR.transitionReplaceInflightToComplete(true, HoodieTestUtils.INSTANT_GENERATOR.getReplaceCommitInflightInstant(commit5.requestedTime()), (HoodieReplaceCommitMetadata)commitMetadata);
        HoodieInstant commit6 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "compaction", "6");
        timelineMOR.createNewInstant(commit6);
        commit6 = timelineMOR.transitionCompactionRequestedToInflight(commit6);
        commit6 = timelineMOR.transitionCompactionInflightToComplete(false, commit6, new HoodieCommitMetadata());
        timelineMOR.createCompleteInstant(commit6);
        timelineMOR = timelineMOR.reload();
        HoodieTimeline resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineMOR, (boolean)false, (boolean)false, (boolean)false);
        Assertions.assertEquals((int)6, (int)resTimeline.getInstants().size());
        resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineMOR, (boolean)false, (boolean)true, (boolean)false);
        Assertions.assertEquals((int)5, (int)resTimeline.getInstants().size());
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit3));
        resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineMOR, (boolean)true, (boolean)false, (boolean)false);
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit6));
        resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineMOR, (boolean)false, (boolean)false, (boolean)true);
        Assertions.assertEquals((int)4, (int)resTimeline.getInstants().size());
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit4));
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit5));
    }

    @Test
    void testFilterInstantsByConditionForCOW() throws IOException {
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        HoodieActiveTimeline timelineCOW = this.metaClient.getActiveTimeline();
        HoodieInstant commit1 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "1");
        timelineCOW.createCompleteInstant(commit1);
        HoodieInstant commit2 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "2");
        timelineCOW.createCompleteInstant(commit2);
        HoodieInstant commit3 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "clustering", "3");
        timelineCOW.createNewInstant(commit3);
        commit3 = timelineCOW.transitionClusterRequestedToInflight(commit3, Option.empty());
        HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(new ArrayList(), new HashMap(), (Option)Option.empty(), (WriteOperationType)WriteOperationType.CLUSTER, (String)"", (String)"replacecommit");
        timelineCOW.transitionClusterInflightToComplete(true, HoodieTestUtils.INSTANT_GENERATOR.getClusteringCommitInflightInstant(commit3.requestedTime()), (HoodieReplaceCommitMetadata)commitMetadata);
        HoodieInstant commit4 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", "4");
        timelineCOW.createNewInstant(commit4);
        commit4 = timelineCOW.transitionReplaceRequestedToInflight(commit4, Option.empty());
        commitMetadata = CommitUtils.buildMetadata(new ArrayList(), new HashMap(), (Option)Option.empty(), (WriteOperationType)WriteOperationType.INSERT_OVERWRITE, (String)"", (String)"replacecommit");
        timelineCOW.transitionReplaceInflightToComplete(true, HoodieTestUtils.INSTANT_GENERATOR.getReplaceCommitInflightInstant(commit4.requestedTime()), (HoodieReplaceCommitMetadata)commitMetadata);
        HoodieInstant commit5 = HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", "5");
        timelineCOW.createNewInstant(commit5);
        commit5 = timelineCOW.transitionReplaceRequestedToInflight(commit5, Option.empty());
        commitMetadata = CommitUtils.buildMetadata(new ArrayList(), new HashMap(), (Option)Option.empty(), (WriteOperationType)WriteOperationType.INSERT_OVERWRITE_TABLE, (String)"", (String)"replacecommit");
        timelineCOW.transitionReplaceInflightToComplete(true, HoodieTestUtils.INSTANT_GENERATOR.getReplaceCommitInflightInstant(commit5.requestedTime()), (HoodieReplaceCommitMetadata)commitMetadata);
        timelineCOW = timelineCOW.reload();
        HoodieTimeline resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineCOW, (boolean)false, (boolean)false, (boolean)false);
        Assertions.assertEquals((int)5, (int)resTimeline.getInstants().size());
        resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineCOW, (boolean)false, (boolean)true, (boolean)false);
        Assertions.assertEquals((int)4, (int)resTimeline.getInstants().size());
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit3));
        resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineCOW, (boolean)true, (boolean)false, (boolean)false);
        Assertions.assertEquals((int)5, (int)resTimeline.getInstants().size());
        resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs((HoodieTableMetaClient)this.metaClient, (HoodieTimeline)timelineCOW, (boolean)false, (boolean)false, (boolean)true);
        Assertions.assertEquals((int)3, (int)resTimeline.getInstants().size());
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit4));
        Assertions.assertFalse((boolean)resTimeline.containsInstant(commit5));
    }

    @Test
    void testInputSplitsSortedByPartition() throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.basePath);
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        conf.set(FlinkOptions.READ_START_COMMIT, (Object)"earliest");
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
        IncrementalInputSplits iis = IncrementalInputSplits.builder().conf(conf).path(new Path(this.basePath)).rowType(TestConfigurations.ROW_TYPE).build();
        IncrementalInputSplits.Result result = iis.inputSplits(this.metaClient, null, false);
        List<String> partitions = this.getFilteredPartitions(result);
        Assertions.assertEquals(Arrays.asList("par1", "par2", "par3", "par4", "par5", "par6"), partitions);
    }

    @ParameterizedTest
    @MethodSource(value={"partitionEvaluators"})
    void testInputSplitsWithPartitionPruner(ExpressionEvaluators.Evaluator partitionEvaluator, List<String> expectedPartitions) throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.basePath);
        conf.set(FlinkOptions.READ_AS_STREAMING, (Object)true);
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        ArrayList<RowData> testData = new ArrayList<RowData>();
        testData.addAll(TestData.DATA_SET_INSERT.stream().collect(Collectors.toList()));
        testData.addAll(TestData.DATA_SET_INSERT_PARTITION_IS_NULL.stream().collect(Collectors.toList()));
        TestData.writeData(testData, conf);
        PartitionPruners.PartitionPruner partitionPruner = PartitionPruners.builder().partitionEvaluators(Collections.singletonList(partitionEvaluator)).partitionKeys(Collections.singletonList("partition")).partitionTypes(Collections.singletonList(DataTypes.STRING())).defaultParName("__HIVE_DEFAULT_PARTITION__").hivePartition(false).build();
        IncrementalInputSplits iis = IncrementalInputSplits.builder().conf(conf).path(new Path(this.basePath)).rowType(TestConfigurations.ROW_TYPE).partitionPruner(partitionPruner).build();
        IncrementalInputSplits.Result result = iis.inputSplits(this.metaClient, null, false);
        List<String> partitions = this.getFilteredPartitions(result);
        Assertions.assertEquals(expectedPartitions, partitions);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testInputSplitsWithPartitionStatsPruner(HoodieTableType tableType) throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.basePath);
        conf.set(FlinkOptions.READ_AS_STREAMING, (Object)true);
        conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, (Object)true);
        conf.set(FlinkOptions.TABLE_TYPE, (Object)tableType.name());
        conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), true);
        conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), true);
        if (tableType == HoodieTableType.MERGE_ON_READ) {
            conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), true);
        }
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)tableType);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        ColumnStatsProbe columnStatsProbe = ColumnStatsProbe.newInstance(Arrays.asList(new CallExpression(FunctionIdentifier.of((String)"greaterThan"), (FunctionDefinition)BuiltInFunctionDefinitions.GREATER_THAN, Arrays.asList(new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0), new ValueLiteralExpression((Object)"id5", (DataType)DataTypes.STRING().notNull())), DataTypes.BOOLEAN()), new CallExpression(FunctionIdentifier.of((String)"lessThan"), (FunctionDefinition)BuiltInFunctionDefinitions.LESS_THAN, Arrays.asList(new FieldReferenceExpression("age", DataTypes.INT(), 2, 2), new ValueLiteralExpression((Object)30, (DataType)DataTypes.INT().notNull())), DataTypes.BOOLEAN())));
        PartitionPruners.PartitionPruner partitionPruner = PartitionPruners.builder().rowType(TestConfigurations.ROW_TYPE).basePath(this.basePath).conf(conf).columnStatsProbe(columnStatsProbe).build();
        IncrementalInputSplits iis = IncrementalInputSplits.builder().conf(conf).path(new Path(this.basePath)).rowType(TestConfigurations.ROW_TYPE).partitionPruner(partitionPruner).build();
        IncrementalInputSplits.Result result = iis.inputSplits(this.metaClient, null, false);
        List<String> partitions = this.getFilteredPartitions(result);
        Assertions.assertEquals(Arrays.asList("par3"), partitions);
    }

    @Test
    void testInputSplitsWithSpeedLimit() throws Exception {
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        Configuration conf = TestConfigurations.getDefaultConf(this.basePath);
        conf.set(FlinkOptions.READ_AS_STREAMING, (Object)true);
        conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, (Object)true);
        conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, (Object)true);
        conf.set(FlinkOptions.READ_COMMITS_LIMIT, (Object)1);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        HoodieTimeline commitsTimeline = this.metaClient.reloadActiveTimeline().filter(hoodieInstant -> hoodieInstant.getAction().equals("commit"));
        HoodieInstant firstInstant = (HoodieInstant)commitsTimeline.firstInstant().get();
        IncrementalInputSplits iis = IncrementalInputSplits.builder().conf(conf).path(new Path(this.basePath)).rowType(TestConfigurations.ROW_TYPE).partitionPruner(null).build();
        IncrementalInputSplits.Result result = iis.inputSplits(this.metaClient, firstInstant.getCompletionTime(), false);
        String minStartCommit = result.getInputSplits().stream().map(split -> (String)((InstantRange)split.getInstantRange().get()).getStartInstant().get()).min((commit1, commit2) -> InstantComparison.compareTimestamps((String)commit1, (BiPredicate)InstantComparison.LESSER_THAN, (String)commit2) ? 1 : 0).orElse(null);
        String maxEndCommit = result.getInputSplits().stream().map(split -> (String)((InstantRange)split.getInstantRange().get()).getEndInstant().get()).max((commit1, commit2) -> InstantComparison.compareTimestamps((String)commit1, (BiPredicate)InstantComparison.GREATER_THAN, (String)commit2) ? 1 : 0).orElse(null);
        Assertions.assertEquals((int)0, (Integer)this.intervalBetween2Instants(commitsTimeline, minStartCommit, maxEndCommit), (String)"Should read 1 instant");
    }

    @Test
    void testInputSplitsForSplitLastCommit() throws Exception {
        this.metaClient = HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        Configuration conf = TestConfigurations.getDefaultConf(this.basePath);
        conf.set(FlinkOptions.READ_AS_STREAMING, (Object)true);
        conf.set(FlinkOptions.READ_START_COMMIT, (Object)"earliest");
        conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, (Object)true);
        conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, (Object)true);
        conf.set(FlinkOptions.OPERATION, (Object)WriteOperationType.INSERT.value());
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        TestData.writeData(TestData.DATA_SET_INSERT, conf);
        HoodieTimeline commitsTimeline = this.metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        List instants = commitsTimeline.getInstants();
        String lastInstant = (String)commitsTimeline.lastInstant().map(HoodieInstant::requestedTime).get();
        List metadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata((String)this.tableName, (Path)new Path(this.basePath), (HoodieInstant)instant, (HoodieTimeline)commitsTimeline)).collect(Collectors.toList());
        List pathInfoList = WriteProfiles.getFilesFromMetadata((Path)new Path(this.basePath), (org.apache.hadoop.conf.Configuration)((org.apache.hadoop.conf.Configuration)this.metaClient.getStorageConf().unwrap()), metadataList, (HoodieTableType)this.metaClient.getTableType());
        HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(this.metaClient, commitsTimeline, pathInfoList);
        Map<String, String> fileIdToBaseInstant = fileSystemView.getAllFileSlices("par1").collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
        IncrementalInputSplits iis = IncrementalInputSplits.builder().conf(conf).path(new Path(this.basePath)).rowType(TestConfigurations.ROW_TYPE).partitionPruner(null).build();
        IncrementalInputSplits.Result result = iis.inputSplits(this.metaClient, null, false);
        result.getInputSplits().stream().filter(split -> fileIdToBaseInstant.containsKey(split.getFileId())).forEach(split -> Assertions.assertEquals(fileIdToBaseInstant.get(split.getFileId()), (Object)split.getLatestCommit()));
        Assertions.assertTrue((boolean)result.getInputSplits().stream().anyMatch(split -> split.getLatestCommit().equals(lastInstant)), (String)"Some input splits' latest commit time should equal to the last instant");
        Assertions.assertTrue((boolean)result.getInputSplits().stream().anyMatch(split -> !split.getLatestCommit().equals(lastInstant)), (String)"The input split latest commit time does not always equal to last instant");
    }

    private static Stream<Arguments> partitionEvaluators() {
        FieldReferenceExpression partitionFieldRef = new FieldReferenceExpression("partition", DataTypes.STRING(), 0, 0);
        ExpressionEvaluators.LeafEvaluator notEqualTo = ExpressionEvaluators.NotEqualTo.getInstance().bindVal(new ValueLiteralExpression((Object)"par3")).bindFieldReference(partitionFieldRef);
        ExpressionEvaluators.LeafEvaluator greaterThan = ExpressionEvaluators.GreaterThanOrEqual.getInstance().bindVal(new ValueLiteralExpression((Object)"par2")).bindFieldReference(partitionFieldRef);
        ExpressionEvaluators.Evaluator and = ExpressionEvaluators.And.getInstance().bindEvaluator(new ExpressionEvaluators.Evaluator[]{greaterThan, notEqualTo});
        ExpressionEvaluators.In in = ExpressionEvaluators.In.getInstance();
        in.bindFieldReference(partitionFieldRef);
        in.bindVals(new Object[]{"par1", "par4"});
        ExpressionEvaluators.IsNotNull isNotNull = ExpressionEvaluators.IsNotNull.getInstance();
        isNotNull.bindFieldReference(partitionFieldRef);
        ExpressionEvaluators.IsNull isNull = ExpressionEvaluators.IsNull.getInstance();
        isNull.bindFieldReference(partitionFieldRef);
        Object[][] data = new Object[][]{{notEqualTo, Arrays.asList("par1", "par2", "par4")}, {greaterThan, Arrays.asList("par2", "par3", "par4")}, {and, Arrays.asList("par2", "par4")}, {in, Arrays.asList("par1", "par4")}, {isNotNull, Arrays.asList("par1", "par2", "par3", "par4")}, {isNull, Arrays.asList("__HIVE_DEFAULT_PARTITION__")}};
        return Stream.of(data).map(Arguments::of);
    }

    private List<String> getFilteredPartitions(IncrementalInputSplits.Result result) {
        ArrayList<String> partitions = new ArrayList<String>();
        result.getInputSplits().forEach(split -> {
            split.getBasePath().map(path -> {
                String[] pathParts = path.split("/");
                partitions.add(pathParts[pathParts.length - 2]);
                return null;
            });
            split.getLogPaths().map(paths -> {
                paths.forEach(path -> {
                    String[] pathParts = path.split("/");
                    partitions.add(pathParts[pathParts.length - 2]);
                });
                return null;
            });
        });
        return partitions;
    }

    private Integer intervalBetween2Instants(HoodieTimeline timeline, String instant1, String instant2) {
        Integer idxInstant1 = this.getInstantIdxInTimeline(timeline, instant1);
        Integer idxInstant2 = this.getInstantIdxInTimeline(timeline, instant2);
        return idxInstant1 != -1 && idxInstant2 != -1 ? Math.abs(idxInstant1 - idxInstant2) : -1;
    }

    private Integer getInstantIdxInTimeline(HoodieTimeline timeline, String instant) {
        List instants = timeline.getInstants();
        return IntStream.range(0, instants.size()).filter(i -> ((HoodieInstant)instants.get(i)).requestedTime().equals(instant)).findFirst().orElse(-1);
    }
}

