/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalInputSplits
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
    private final Configuration conf;
    private final Path path;
    private final RowType rowType;
    private final long maxCompactionMemoryInBytes;
    private final PartitionPruners.PartitionPruner partitionPruner;
    private final boolean skipCompaction;
    private final boolean skipClustering;
    private final boolean skipInsertOverwrite;

    private IncrementalInputSplits(Configuration conf, Path path, RowType rowType, long maxCompactionMemoryInBytes, @Nullable PartitionPruners.PartitionPruner partitionPruner, boolean skipCompaction, boolean skipClustering, boolean skipInsertOverwrite) {
        this.conf = conf;
        this.path = path;
        this.rowType = rowType;
        this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
        this.partitionPruner = partitionPruner;
        this.skipCompaction = skipCompaction;
        this.skipClustering = skipClustering;
        this.skipInsertOverwrite = skipInsertOverwrite;
    }

    public static Builder builder() {
        return new Builder();
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, boolean cdcEnabled) {
        List<StoragePathInfo> fileInfoList;
        Set<String> readPartitions;
        IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder().metaClient(metaClient).startCompletionTime(this.conf.getString(FlinkOptions.READ_START_COMMIT)).endCompletionTime(this.conf.getString(FlinkOptions.READ_END_COMMIT)).rangeType(InstantRange.RangeType.CLOSED_CLOSED).skipCompaction(this.skipCompaction).skipClustering(this.skipClustering).skipInsertOverwrite(this.skipInsertOverwrite).readCdcFromChangelog(this.conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG)).build();
        IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
        if (analyzingResult.isEmpty()) {
            LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
            return Result.EMPTY;
        }
        HoodieTimeline commitTimeline = analyzingResult.getActiveTimeline();
        boolean startFromEarliest = analyzingResult.isConsumingFromEarliest();
        boolean hasArchivedInstants = !analyzingResult.getArchivedInstants().isEmpty();
        boolean fullTableScan = startFromEarliest || hasArchivedInstants;
        InstantRange instantRange = analyzingResult.getInstantRange().orElse(null);
        String endInstant = analyzingResult.getLastInstant();
        if (fullTableScan) {
            FileIndex fileIndex = this.getFileIndex();
            readPartitions = new TreeSet<String>(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            fileInfoList = fileIndex.getFilesInPartitions();
        } else {
            if (cdcEnabled) {
                List<MergeOnReadInputSplit> inputSplits = this.getCdcInputSplits(metaClient, instantRange);
                return Result.instance(inputSplits, endInstant);
            }
            String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
            List<HoodieInstant> instants = analyzingResult.getActiveInstants();
            List<HoodieCommitMetadata> metadataList = instants.stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
            readPartitions = this.getReadPartitions(metadataList);
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            List<StoragePathInfo> files = WriteProfiles.getFilesFromMetadata(this.path, (org.apache.hadoop.conf.Configuration)metaClient.getStorageConf().unwrap(), metadataList, metaClient.getTableType(), false);
            if (files == null) {
                LOG.warn("Found deleted files in metadata, fall back to full table scan.");
                FileIndex fileIndex = this.getFileIndex();
                readPartitions = new TreeSet<String>(fileIndex.getOrBuildPartitionPaths());
                if (readPartitions.size() == 0) {
                    LOG.warn("No partitions found for reading in user provided path.");
                    return Result.EMPTY;
                }
                fileInfoList = fileIndex.getFilesInPartitions();
            } else {
                fileInfoList = files;
            }
        }
        if (fileInfoList.size() == 0) {
            LOG.warn("No files found for reading in user provided path.");
            return Result.EMPTY;
        }
        List<MergeOnReadInputSplit> inputSplits = this.getInputSplits(metaClient, commitTimeline, fileInfoList, readPartitions, endInstant, analyzingResult.getMaxCompletionTime(), instantRange, false);
        return Result.instance(inputSplits, endInstant);
    }

    public Result inputSplits(HoodieTableMetaClient metaClient, @Nullable String issuedOffset, boolean cdcEnabled) {
        metaClient.reloadActiveTimeline();
        IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder().metaClient(metaClient).startCompletionTime(issuedOffset != null ? issuedOffset : this.conf.getString(FlinkOptions.READ_START_COMMIT)).endCompletionTime(this.conf.getString(FlinkOptions.READ_END_COMMIT)).rangeType(issuedOffset != null ? InstantRange.RangeType.OPEN_CLOSED : InstantRange.RangeType.CLOSED_CLOSED).skipCompaction(this.skipCompaction).skipClustering(this.skipClustering).skipInsertOverwrite(this.skipInsertOverwrite).readCdcFromChangelog(this.conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG)).limit(OptionsResolver.getReadCommitsLimit(this.conf)).build();
        IncrementalQueryAnalyzer.QueryContext queryContext = analyzer.analyze();
        if (queryContext.isEmpty()) {
            LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
            return Result.EMPTY;
        }
        HoodieTimeline commitTimeline = queryContext.getActiveTimeline();
        String endInstant = queryContext.getLastInstant();
        Option<InstantRange> instantRange = queryContext.getInstantRange();
        String offsetToIssue = queryContext.getMaxCompletionTime();
        if (instantRange.isEmpty()) {
            FileIndex fileIndex = this.getFileIndex();
            TreeSet<String> readPartitions = new TreeSet<String>(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading under path: " + this.path);
                return Result.EMPTY;
            }
            List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
            if (pathInfoList.size() == 0) {
                LOG.warn("No files found for reading under path: " + this.path);
                return Result.EMPTY;
            }
            List<MergeOnReadInputSplit> inputSplits = this.getInputSplits(metaClient, commitTimeline, pathInfoList, readPartitions, endInstant, offsetToIssue, null, false);
            return Result.instance(inputSplits, endInstant, offsetToIssue);
        }
        List<MergeOnReadInputSplit> inputSplits = this.getIncInputSplits(metaClient, (org.apache.hadoop.conf.Configuration)metaClient.getStorageConf().unwrap(), commitTimeline, queryContext, instantRange.get(), endInstant, cdcEnabled);
        return Result.instance(inputSplits, endInstant, offsetToIssue);
    }

    private List<MergeOnReadInputSplit> getIncInputSplits(HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf, HoodieTimeline commitTimeline, IncrementalQueryAnalyzer.QueryContext queryContext, InstantRange instantRange, String endInstant, boolean cdcEnabled) {
        List<HoodieCommitMetadata> metadataList;
        Set<String> readPartitions;
        if (cdcEnabled) {
            return this.getCdcInputSplits(metaClient, instantRange);
        }
        String tableName = this.conf.getString(FlinkOptions.TABLE_NAME);
        List activeMetadataList = queryContext.getActiveInstants().stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, commitTimeline)).collect(Collectors.toList());
        List archivedMetadataList = queryContext.getArchivedInstants().stream().map(instant -> WriteProfiles.getCommitMetadata(tableName, this.path, instant, queryContext.getArchivedTimeline())).collect(Collectors.toList());
        if (archivedMetadataList.size() > 0) {
            LOG.warn("\n--------------------------------------------------------------------------------\n---------- caution: the reader has fall behind too much from the writer,\n---------- tweak 'read.tasks' option to add parallelism of read tasks.\n--------------------------------------------------------------------------------");
        }
        if ((readPartitions = this.getReadPartitions(metadataList = IncrementalInputSplits.mergeList(archivedMetadataList, activeMetadataList))).size() == 0) {
            LOG.warn("No partitions found for reading under path: " + this.path);
            return Collections.emptyList();
        }
        List<StoragePathInfo> pathInfoList = WriteProfiles.getFilesFromMetadata(this.path, hadoopConf, metadataList, metaClient.getTableType());
        if (pathInfoList.size() == 0) {
            LOG.warn("No files found for reading under path: " + this.path);
            return Collections.emptyList();
        }
        return this.getInputSplits(metaClient, commitTimeline, pathInfoList, readPartitions, endInstant, queryContext.getMaxCompletionTime(), instantRange, this.skipCompaction);
    }

    private List<MergeOnReadInputSplit> getInputSplits(HoodieTableMetaClient metaClient, HoodieTimeline commitTimeline, List<StoragePathInfo> pathInfoList, Set<String> readPartitions, String endInstant, String maxCompletionTime, InstantRange instantRange, boolean skipBaseFiles) {
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList);
        AtomicInteger cnt = new AtomicInteger(0);
        String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
        return readPartitions.stream().map(relPartitionPath -> IncrementalInputSplits.getFileSlices(fsView, relPartitionPath, maxCompletionTime, skipBaseFiles).map(fileSlice -> {
            Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).filter(logPath -> !logPath.endsWith(".cdc")).collect(Collectors.toList()));
            String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
            String latestCommit = InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, metaClient.getBasePath().toString(), this.maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
        }).collect(Collectors.toList())).flatMap(Collection::stream).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList());
    }

    private List<MergeOnReadInputSplit> getCdcInputSplits(HoodieTableMetaClient metaClient, InstantRange instantRange) {
        HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange, OptionsResolver.readCDCFromChangelog(this.conf));
        Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
        if (fileSplits.isEmpty()) {
            LOG.warn("No change logs found for reading in path: " + this.path);
            return Collections.emptyList();
        }
        AtomicInteger cnt = new AtomicInteger(0);
        return fileSplits.entrySet().stream().map(splits -> new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath().toString(), this.maxCompactionMemoryInBytes, ((HoodieFileGroupId)splits.getKey()).getFileId(), (HoodieCDCFileSplit[])((List)splits.getValue()).stream().sorted().toArray(HoodieCDCFileSplit[]::new))).collect(Collectors.toList());
    }

    private static Stream<FileSlice> getFileSlices(HoodieTableFileSystemView fsView, String relPartitionPath, String endInstant, boolean skipBaseFiles) {
        return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant) : fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant);
    }

    private FileIndex getFileIndex() {
        return FileIndex.builder().path(new StoragePath(this.path.toUri())).conf(this.conf).rowType(this.rowType).partitionPruner(this.partitionPruner).build();
    }

    private Set<String> getReadPartitions(List<HoodieCommitMetadata> metadataList) {
        Set<String> partitions = HoodieTableMetadataUtil.getWritePartitionPaths(metadataList);
        if (this.partitionPruner != null) {
            Set<String> selectedPartitions = this.partitionPruner.filter(partitions);
            double total = partitions.size();
            double selectedNum = selectedPartitions.size();
            double percentPruned = total == 0.0 ? 0.0 : (1.0 - selectedNum / total) * 100.0;
            LOG.info("Selected " + selectedNum + " partitions out of " + total + ", pruned " + percentPruned + "% partitions.");
            return selectedPartitions;
        }
        return partitions;
    }

    private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
        if (list1.isEmpty()) {
            return list2;
        }
        if (list2.isEmpty()) {
            return list1;
        }
        ArrayList<T> merged = new ArrayList<T>(list1);
        merged.addAll(list2);
        return merged;
    }

    public static class Builder {
        private Configuration conf;
        private Path path;
        private RowType rowType;
        private long maxCompactionMemoryInBytes;
        private PartitionPruners.PartitionPruner partitionPruner;
        private boolean skipCompaction = false;
        private boolean skipClustering = false;
        private boolean skipInsertOverwrite = false;

        public Builder conf(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder path(Path path) {
            this.path = path;
            return this;
        }

        public Builder rowType(RowType rowType) {
            this.rowType = rowType;
            return this;
        }

        public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
            this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
            return this;
        }

        public Builder partitionPruner(@Nullable PartitionPruners.PartitionPruner partitionPruner) {
            this.partitionPruner = partitionPruner;
            return this;
        }

        public Builder skipCompaction(boolean skipCompaction) {
            this.skipCompaction = skipCompaction;
            return this;
        }

        public Builder skipClustering(boolean skipClustering) {
            this.skipClustering = skipClustering;
            return this;
        }

        public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
            this.skipInsertOverwrite = skipInsertOverwrite;
            return this;
        }

        public IncrementalInputSplits build() {
            return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), this.maxCompactionMemoryInBytes, this.partitionPruner, this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
        }
    }

    public static class Result {
        private final List<MergeOnReadInputSplit> inputSplits;
        private final String endInstant;
        private final String offset;
        public static final Result EMPTY = Result.instance(Collections.emptyList(), "");

        public boolean isEmpty() {
            return this.inputSplits.size() == 0;
        }

        public List<MergeOnReadInputSplit> getInputSplits() {
            return this.inputSplits;
        }

        public String getEndInstant() {
            return this.endInstant;
        }

        @Nullable
        public String getOffset() {
            return this.offset;
        }

        private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant, @Nullable String offset) {
            this.inputSplits = inputSplits;
            this.endInstant = endInstant;
            this.offset = offset;
        }

        public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
            return new Result(inputSplits, endInstant, null);
        }

        public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant, String offset) {
            return new Result(inputSplits, endInstant, offset);
        }
    }
}

