/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Serializable;

public class IncrementalInputSplits
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
    private final Configuration conf;
    private final Path path;
    private final RowType rowType;
    private final long maxCompactionMemoryInBytes;
    private final Set<String> requiredPartitions;
    private final boolean skipCompaction;

    private IncrementalInputSplits(Configuration conf, Path path, RowType rowType, long maxCompactionMemoryInBytes, @Nullable Set<String> requiredPartitions, boolean skipCompaction) {
        this.conf = conf;
        this.path = path;
        this.rowType = rowType;
        this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
        this.requiredPartitions = requiredPartitions;
        this.skipCompaction = skipCompaction;
    }

    public static Builder builder() {
        return new Builder();
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf) {
        String rangeStart;
        String rangeEnd;
        FileStatus[] fileStatuses;
        Set<String> readPartitions;
        HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
        if (commitTimeline.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return Result.EMPTY;
        }
        String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
        String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT);
        boolean startFromEarliest = "earliest".equalsIgnoreCase(startCommit);
        boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit);
        boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit);
        boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange;
        List<HoodieInstant> instants = this.filterInstantsWithRange(commitTimeline, null);
        if (fullTableScan) {
            FileIndex fileIndex = this.getFileIndex();
            readPartitions = new HashSet<String>(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileStatuses = fileIndex.getFilesInPartitions();
        } else {
            if (instants.size() == 0) {
                LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
                return Result.EMPTY;
            }
            String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
            List<HoodieCommitMetadata> metadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
            readPartitions = this.getReadPartitions(metadataList);
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            FileStatus[] files = WriteProfiles.getRawWritePathsOfInstants(this.path, hadoopConf, metadataList, metaClient.getTableType());
            FileSystem fs = FSUtils.getFs(this.path.toString(), hadoopConf);
            if (Arrays.stream(files).anyMatch(fileStatus -> !StreamerUtil.fileExists(fs, fileStatus.getPath()))) {
                LOG.warn("Found deleted files in metadata, fall back to full table scan.");
                fullTableScan = true;
                FileIndex fileIndex = this.getFileIndex();
                readPartitions = new HashSet<String>(fileIndex.getOrBuildPartitionPaths());
                if (readPartitions.size() == 0) {
                    LOG.warn("No partitions found for reading in user provided path.");
                    return Result.EMPTY;
                }
                fileStatuses = fileIndex.getFilesInPartitions();
            } else {
                fileStatuses = files;
            }
        }
        if (fileStatuses.length == 0) {
            LOG.warn("No files found for reading in user provided path.");
            return Result.EMPTY;
        }
        String string = rangeEnd = endOutOfRange ? endCommit : instants.get(instants.size() - 1).getTimestamp();
        String string2 = startFromEarliest ? null : (rangeStart = startCommit == null ? rangeEnd : startCommit);
        InstantRange instantRange = !fullTableScan ? InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build() : (startFromEarliest && endCommit == null ? null : InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd).rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build());
        String endInstant = fullTableScan ? commitTimeline.lastInstant().get().getTimestamp() : instants.get(instants.size() - 1).getTimestamp();
        List<MergeOnReadInputSplit> inputSplits = this.getInputSplits(metaClient, commitTimeline, fileStatuses, readPartitions, endInstant, instantRange);
        return Result.instance(inputSplits, endInstant);
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf, String issuedInstant) {
        FileStatus[] fileStatuses;
        Set<String> readPartitions;
        InstantRange instantRange;
        HoodieInstant instantToIssue;
        metaClient.reloadActiveTimeline();
        HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
        if (commitTimeline.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return Result.EMPTY;
        }
        List<HoodieInstant> instants = this.filterInstantsWithRange(commitTimeline, issuedInstant);
        HoodieInstant hoodieInstant = instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
        if (instantToIssue != null) {
            String startCommit;
            instantRange = issuedInstant != null ? InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue.getTimestamp()).rangeType(InstantRange.RangeType.OPEN_CLOSE).build() : (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() ? ((startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT)).equalsIgnoreCase("earliest") ? null : InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue.getTimestamp()).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build()) : InstantRange.builder().startInstant(instantToIssue.getTimestamp()).endInstant(instantToIssue.getTimestamp()).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build());
        } else {
            LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
            return Result.EMPTY;
        }
        String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
        if (instantRange == null) {
            FileIndex fileIndex = this.getFileIndex();
            readPartitions = new HashSet<String>(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileStatuses = fileIndex.getFilesInPartitions();
        } else {
            List<HoodieCommitMetadata> metadataList;
            List<HoodieCommitMetadata> activeMetadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
            List<HoodieCommitMetadata> archivedMetadataList = this.getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
            if (archivedMetadataList.size() > 0) {
                LOG.warn("\n--------------------------------------------------------------------------------\n---------- caution: the reader has fall behind too much from the writer,\n---------- tweak 'read.tasks' option to add parallelism of read tasks.\n--------------------------------------------------------------------------------");
            }
            if ((readPartitions = this.getReadPartitions(metadataList = archivedMetadataList.size() > 0 ? IncrementalInputSplits.mergeList(archivedMetadataList, activeMetadataList) : activeMetadataList)).size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileStatuses = WriteProfiles.getWritePathsOfInstants(this.path, hadoopConf, metadataList, metaClient.getTableType());
        }
        if (fileStatuses.length == 0) {
            LOG.warn("No files found for reading in user provided path.");
            return Result.EMPTY;
        }
        String endInstant = instantToIssue.getTimestamp();
        List<MergeOnReadInputSplit> inputSplits = this.getInputSplits(metaClient, commitTimeline, fileStatuses, readPartitions, endInstant, instantRange);
        return Result.instance(inputSplits, endInstant);
    }

    private List<MergeOnReadInputSplit> getInputSplits(HoodieTableMetaClient metaClient, HoodieTimeline commitTimeline, FileStatus[] fileStatuses, Set<String> readPartitions, String endInstant, InstantRange instantRange) {
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
        AtomicInteger cnt = new AtomicInteger(0);
        String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
        return readPartitions.stream().map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn((String)relPartitionPath, endInstant).map(fileSlice -> {
            Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()));
            String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, endInstant, metaClient.getBasePath(), this.maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
        }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private FileIndex getFileIndex() {
        FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(this.path.toUri()), this.conf, this.rowType);
        if (this.requiredPartitions != null) {
            fileIndex.setPartitionPaths(this.requiredPartitions);
        }
        return fileIndex;
    }

    private Set<String> getReadPartitions(List<HoodieCommitMetadata> metadataList) {
        Set<String> partitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
        if (this.requiredPartitions != null) {
            return partitions.stream().filter(this.requiredPartitions::contains).collect(Collectors.toSet());
        }
        return partitions;
    }

    private List<HoodieCommitMetadata> getArchivedMetadata(HoodieTableMetaClient metaClient, InstantRange instantRange, HoodieTimeline commitTimeline, String tableName) {
        HoodieArchivedTimeline archivedTimeline;
        HoodieTimeline archivedCompleteTimeline;
        if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant()) && !(archivedCompleteTimeline = (archivedTimeline = metaClient.getArchivedTimeline(instantRange.getStartInstant())).getCommitsTimeline().filterCompletedInstants()).empty()) {
            Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
            return this.maySkipCompaction(instantStream).map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, archivedTimeline)).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
        HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
        return this.filterInstantsByCondition(timeline);
    }

    private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) {
        HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false);
        HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
        return this.filterInstantsByCondition(archivedCompleteTimeline);
    }

    @VisibleForTesting
    public List<HoodieInstant> filterInstantsWithRange(HoodieTimeline commitTimeline, String issuedInstant) {
        HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
        if (issuedInstant != null) {
            return this.maySkipCompaction(completedTimeline.getInstants()).filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), HoodieTimeline.GREATER_THAN, issuedInstant)).collect(Collectors.toList());
        }
        Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
        if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
            List<HoodieInstant> instants = completedTimeline.getInstants().collect(Collectors.toList());
            if (instants.size() > 1) {
                return Collections.singletonList(instants.get(instants.size() - 1));
            }
            return instants;
        }
        if (OptionsResolver.isSpecificStartCommit(this.conf)) {
            String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
            instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, startCommit));
        }
        if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
            String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
            instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), HoodieTimeline.LESSER_THAN_OR_EQUALS, endCommit));
        }
        return this.maySkipCompaction(instantStream).collect(Collectors.toList());
    }

    private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> instants) {
        return this.skipCompaction ? instants.filter(instant -> !instant.getAction().equals("commit")) : instants;
    }

    @VisibleForTesting
    public HoodieTimeline filterInstantsByCondition(HoodieTimeline timeline) {
        HoodieTimeline oriTimeline = timeline;
        if (this.skipCompaction) {
            timeline = timeline.filter(instant -> !instant.getAction().equals("commit"));
        }
        return timeline;
    }

    private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
        ArrayList<T> merged = new ArrayList<T>(list1);
        merged.addAll(list2);
        return merged;
    }

    public static class Builder {
        private Configuration conf;
        private Path path;
        private RowType rowType;
        private long maxCompactionMemoryInBytes;
        private Set<String> requiredPartitions;
        private boolean skipCompaction = false;

        public Builder conf(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder path(Path path) {
            this.path = path;
            return this;
        }

        public Builder rowType(RowType rowType) {
            this.rowType = rowType;
            return this;
        }

        public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
            this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
            return this;
        }

        public Builder requiredPartitions(@Nullable Set<String> requiredPartitions) {
            this.requiredPartitions = requiredPartitions;
            return this;
        }

        public Builder skipCompaction(boolean skipCompaction) {
            this.skipCompaction = skipCompaction;
            return this;
        }

        public IncrementalInputSplits build() {
            return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
        }
    }

    public static class Result {
        private final List<MergeOnReadInputSplit> inputSplits;
        private final String endInstant;
        public static final Result EMPTY = Result.instance(Collections.emptyList(), "");

        public boolean isEmpty() {
            return this.inputSplits.size() == 0;
        }

        public List<MergeOnReadInputSplit> getInputSplits() {
            return this.inputSplits;
        }

        public String getEndInstant() {
            return this.endInstant;
        }

        private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
            this.inputSplits = inputSplits;
            this.endInstant = endInstant;
        }

        public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
            return new Result(inputSplits, endInstant);
        }
    }
}

