/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class CleanPlanner<T extends HoodieRecordPayload, I, K, O>
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
    public static final Integer CLEAN_PLAN_VERSION_1 = CleanPlanV1MigrationHandler.VERSION;
    public static final Integer CLEAN_PLAN_VERSION_2;
    public static final Integer LATEST_CLEAN_PLAN_VERSION;
    private final SyncableFileSystemView fileSystemView;
    private final HoodieTimeline commitTimeline;
    private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
    private HoodieTable<T, I, K, O> hoodieTable;
    private HoodieWriteConfig config;
    private transient HoodieEngineContext context;

    public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
        this.context = context;
        this.hoodieTable = hoodieTable;
        this.fileSystemView = hoodieTable.getHoodieView();
        this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
        this.config = config;
        this.fgIdToPendingCompactionOperations = ((SyncableFileSystemView)hoodieTable.getSliceView()).getPendingCompactionOperations().map(entry -> Pair.of(new HoodieFileGroupId(((CompactionOperation)entry.getValue()).getPartitionPath(), ((CompactionOperation)entry.getValue()).getFileId()), entry.getValue())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    }

    public Stream<String> getSavepointedDataFiles(String savepointTime) {
        HoodieSavepointMetadata metadata;
        if (!this.hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
            throw new HoodieSavepointException("Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
        }
        HoodieInstant instant = new HoodieInstant(false, "savepoint", savepointTime);
        try {
            metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(this.hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
        }
        catch (IOException e) {
            throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
        }
        return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
    }

    public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
        switch (this.config.getCleanerPolicy()) {
            case KEEP_LATEST_COMMITS: 
            case KEEP_LATEST_BY_HOURS: {
                return this.getPartitionPathsForCleanByCommits(earliestRetainedInstant);
            }
            case KEEP_LATEST_FILE_VERSIONS: {
                return this.getPartitionPathsForFullCleaning();
            }
        }
        throw new IllegalStateException("Unknown Cleaner Policy");
    }

    private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
        Option<HoodieInstant> lastClean;
        if (!instantToRetain.isPresent()) {
            LOG.info((Object)"No earliest commit to retain. No need to scan partitions !!");
            return Collections.emptyList();
        }
        if (this.config.incrementalCleanerModeEnabled() && (lastClean = this.hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            if (this.hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
                this.hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
            } else {
                HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(this.hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
                if (cleanMetadata.getEarliestCommitToRetain() != null && cleanMetadata.getEarliestCommitToRetain().length() > 0) {
                    return this.getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
                }
            }
        }
        return this.getPartitionPathsForFullCleaning();
    }

    private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, Option<HoodieInstant> newInstantToRetain) {
        LOG.info((Object)("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + ". New Instant to retain : " + newInstantToRetain));
        return this.hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, ((HoodieInstant)newInstantToRetain.get()).getTimestamp())).flatMap(instant -> {
            try {
                if ("replacecommit".equals(instant.getAction())) {
                    HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(this.hoodieTable.getActiveTimeline().getInstantDetails((HoodieInstant)instant).get(), HoodieReplaceCommitMetadata.class);
                    return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
                }
                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(this.hoodieTable.getActiveTimeline().getInstantDetails((HoodieInstant)instant).get(), HoodieCommitMetadata.class);
                return commitMetadata.getPartitionToWriteStats().keySet().stream();
            }
            catch (IOException e) {
                throw new HoodieIOException(e.getMessage(), e);
            }
        }).distinct().collect(Collectors.toList());
    }

    private List<String> getPartitionPathsForFullCleaning() {
        try {
            FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(this.context, this.context.getHadoopConf(), this.config.getBasePath(), this.config.shouldAssumeDatePartitioning());
            return fsBackedTableMetadata.getAllPartitionPaths();
        }
        catch (IOException e) {
            return Collections.emptyList();
        }
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
        LOG.info((Object)("Cleaning " + partitionPath + ", retaining latest " + this.config.getCleanerFileVersionsRetained() + " file versions. "));
        ArrayList<CleanFileInfo> deletePaths = new ArrayList<CleanFileInfo>();
        List<String> savepointedFiles = this.hoodieTable.getSavepointTimestamps().stream().flatMap(this::getSavepointedDataFiles).collect(Collectors.toList());
        deletePaths.addAll(this.getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
        boolean toDeletePartition = false;
        List fileGroups = this.fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
        for (HoodieFileGroup fileGroup : fileGroups) {
            int keepVersions = this.config.getCleanerFileVersionsRetained();
            Iterator fileSliceIterator = fileGroup.getAllFileSlices().filter(fs -> !this.isFileSliceNeededForPendingCompaction((FileSlice)fs)).iterator();
            if (this.isFileGroupInPendingCompaction(fileGroup)) {
                --keepVersions;
            }
            while (fileSliceIterator.hasNext() && keepVersions > 0) {
                fileSliceIterator.next();
                --keepVersions;
            }
            while (fileSliceIterator.hasNext()) {
                FileSlice nextSlice = (FileSlice)fileSliceIterator.next();
                Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
                if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) continue;
                deletePaths.addAll(this.getCleanFileInfoForSlice(nextSlice));
            }
        }
        if (fileGroups.isEmpty()) {
            toDeletePartition = true;
        }
        return Pair.of(toDeletePartition, deletePaths);
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
        return this.getFilesToCleanKeepingLatestCommits(partitionPath, this.config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
        LOG.info((Object)("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "));
        ArrayList<CleanFileInfo> deletePaths = new ArrayList<CleanFileInfo>();
        List<String> savepointedFiles = this.hoodieTable.getSavepointTimestamps().stream().flatMap(this::getSavepointedDataFiles).collect(Collectors.toList());
        boolean toDeletePartition = false;
        if (this.commitTimeline.countInstants() > commitsRetained) {
            Option<HoodieInstant> earliestCommitToRetainOption = this.getEarliestCommitToRetain();
            HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
            deletePaths.addAll(this.getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
            List fileGroups = this.fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
            for (HoodieFileGroup fileGroup : fileGroups) {
                List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
                if (fileSliceList.isEmpty()) continue;
                String lastVersion = ((FileSlice)fileSliceList.get(0)).getBaseInstantTime();
                String lastVersionBeforeEarliestCommitToRetain = this.getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
                for (FileSlice aSlice : fileSliceList) {
                    Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
                    String fileCommitTime = aSlice.getBaseInstantTime();
                    if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName()) || (policy != HoodieCleaningPolicy.KEEP_LATEST_COMMITS ? policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS && fileCommitTime.equals(lastVersion) : fileCommitTime.equals(lastVersion) || fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) continue;
                    if (this.isFileSliceNeededForPendingCompaction(aSlice) || !HoodieTimeline.compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) continue;
                    aFile.ifPresent(hoodieDataFile -> {
                        deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
                        if (hoodieDataFile.getBootstrapBaseFile().isPresent() && this.config.shouldCleanBootstrapBaseFile().booleanValue()) {
                            deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
                        }
                    });
                    if (this.hoodieTable.getMetaClient().getTableType() != HoodieTableType.MERGE_ON_READ) continue;
                    deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)).collect(Collectors.toList()));
                }
            }
            if (fileGroups.isEmpty()) {
                toDeletePartition = true;
            }
        }
        return Pair.of(toDeletePartition, deletePaths);
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
        return this.getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
    }

    private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
        Stream<HoodieFileGroup> replacedGroups = earliestCommitToRetain.isPresent() ? this.fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath) : this.fileSystemView.getAllReplacedFileGroups(partitionPath);
        return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices).filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName())).flatMap(slice -> this.getCleanFileInfoForSlice((FileSlice)slice).stream()).collect(Collectors.toList());
    }

    private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, HoodieInstant instantTime) {
        for (FileSlice file : fileSliceList) {
            String fileCommitTime = file.getBaseInstantTime();
            if (!HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) continue;
            return fileCommitTime;
        }
        return null;
    }

    private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
        ArrayList<CleanFileInfo> cleanPaths = new ArrayList<CleanFileInfo>();
        if (nextSlice.getBaseFile().isPresent()) {
            HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
            cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
            if (dataFile.getBootstrapBaseFile().isPresent() && this.config.shouldCleanBootstrapBaseFile().booleanValue()) {
                cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
            }
        }
        if (this.hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
            cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)).collect(Collectors.toList()));
        }
        return cleanPaths;
    }

    public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
        Pair<Boolean, List<CleanFileInfo>> deletePaths;
        HoodieCleaningPolicy policy = this.config.getCleanerPolicy();
        if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
            deletePaths = this.getFilesToCleanKeepingLatestCommits(partitionPath);
        } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
            deletePaths = this.getFilesToCleanKeepingLatestVersions(partitionPath);
        } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
            deletePaths = this.getFilesToCleanKeepingLatestHours(partitionPath);
        } else {
            throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
        }
        LOG.info((Object)(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath));
        if (deletePaths.getKey().booleanValue()) {
            LOG.info((Object)("Partition " + partitionPath + " to be deleted"));
        }
        return deletePaths;
    }

    public Option<HoodieInstant> getEarliestCommitToRetain() {
        Option<HoodieInstant> earliestCommitToRetain = Option.empty();
        int commitsRetained = this.config.getCleanerCommitsRetained();
        int hoursRetained = this.config.getCleanerHoursRetained();
        if (this.config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS && this.commitTimeline.countInstants() > commitsRetained) {
            Option<HoodieInstant> earliestPendingCommits = this.hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline().filter(s -> !s.isCompleted()).firstInstant();
            earliestCommitToRetain = earliestPendingCommits.isPresent() ? this.commitTimeline.nthInstant(this.commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
                if (nthInstant.compareTo((HoodieInstant)earliestPendingCommits.get()) <= 0) {
                    return Option.of(nthInstant);
                }
                return this.commitTimeline.findInstantsBefore(((HoodieInstant)earliestPendingCommits.get()).getTimestamp()).lastInstant();
            }).orElse(Option.empty()) : this.commitTimeline.nthInstant(this.commitTimeline.countInstants() - commitsRetained);
        } else if (this.config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
            Instant instant = Instant.now();
            ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
            String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
            earliestCommitToRetain = Option.fromJavaOptional(this.commitTimeline.getInstants().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
        }
        return earliestCommitToRetain;
    }

    public String getLastCompletedCommitTimestamp() {
        if (this.commitTimeline.lastInstant().isPresent()) {
            return this.commitTimeline.lastInstant().get().getTimestamp();
        }
        return "";
    }

    private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) {
        CompactionOperation op = this.fgIdToPendingCompactionOperations.get(fileSlice.getFileGroupId());
        if (null != op) {
            return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), HoodieTimeline.GREATER_THAN_OR_EQUALS, op.getBaseInstantTime());
        }
        return false;
    }

    private boolean isFileGroupInPendingCompaction(HoodieFileGroup fg) {
        return this.fgIdToPendingCompactionOperations.containsKey(fg.getFileGroupId());
    }

    static {
        LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION;
    }
}

