/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.view;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineDiffHelper;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class IncrementalTimelineSyncFileSystemView
extends AbstractTableFileSystemView {
    private static final Logger LOG = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class);
    private final boolean incrementalTimelineSyncEnabled;
    private HoodieTimeline visibleActiveTimeline;

    protected IncrementalTimelineSyncFileSystemView(boolean enableIncrementalTimelineSync) {
        this.incrementalTimelineSyncEnabled = enableIncrementalTimelineSync;
    }

    @Override
    protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
        this.visibleActiveTimeline = visibleActiveTimeline;
        super.refreshTimeline(visibleActiveTimeline);
    }

    @Override
    public void sync() {
        try {
            this.writeLock.lock();
            this.maySyncIncrementally();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    protected void maySyncIncrementally() {
        HoodieTimeline oldTimeline = this.getTimeline();
        HoodieTimeline newTimeline = this.metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
        try {
            TimelineDiffHelper.TimelineDiffResult diffResult;
            if (this.incrementalTimelineSyncEnabled && (diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline)).canSyncIncrementally()) {
                LOG.info((Object)"Doing incremental sync");
                this.runIncrementalSync(newTimeline, diffResult);
                LOG.info((Object)"Finished incremental sync");
                this.refreshTimeline(newTimeline);
                return;
            }
        }
        catch (Exception ioe) {
            LOG.error((Object)"Got exception trying to perform incremental sync. Reverting to complete sync", (Throwable)ioe);
        }
        this.clear();
        this.init(this.metaClient, newTimeline);
    }

    private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffHelper.TimelineDiffResult diffResult) {
        LOG.info((Object)("Timeline Diff Result is :" + diffResult));
        diffResult.getFinishedCompactionInstants().stream().forEach(instant -> {
            try {
                this.removePendingCompactionInstant(timeline, (HoodieInstant)instant);
            }
            catch (IOException e) {
                throw new HoodieException(e);
            }
        });
        diffResult.getNewlySeenInstants().stream().filter(instant -> instant.isCompleted() || instant.getAction().equals("compaction")).forEach(instant -> {
            try {
                if (instant.getAction().equals("commit") || instant.getAction().equals("deltacommit")) {
                    this.addCommitInstant(timeline, (HoodieInstant)instant);
                } else if (instant.getAction().equals("restore")) {
                    this.addRestoreInstant(timeline, (HoodieInstant)instant);
                } else if (instant.getAction().equals("clean")) {
                    this.addCleanInstant(timeline, (HoodieInstant)instant);
                } else if (instant.getAction().equals("compaction")) {
                    this.addPendingCompactionInstant(timeline, (HoodieInstant)instant);
                } else if (instant.getAction().equals("rollback")) {
                    this.addRollbackInstant(timeline, (HoodieInstant)instant);
                } else if (instant.getAction().equals("replacecommit")) {
                    this.addReplaceInstant(timeline, (HoodieInstant)instant);
                }
            }
            catch (IOException ioe) {
                throw new HoodieException(ioe);
            }
        });
    }

    private void removePendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Removing completed compaction instant (" + instant + ")"));
        HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(this.metaClient, instant.getTimestamp());
        this.removePendingCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant, plan).map(instantPair -> Pair.of(((Pair)instantPair.getValue()).getKey(), CompactionOperation.convertFromAvroRecordInstance((HoodieCompactionOperation)((Object)((Object)((Pair)instantPair.getValue()).getValue()))))));
    }

    private void addPendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Syncing pending compaction instant (" + instant + ")"));
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(this.metaClient, instant.getTimestamp());
        List pendingOps = CompactionUtils.getPendingCompactionOperations(instant, compactionPlan).map(p -> Pair.of(((Pair)p.getValue()).getKey(), CompactionOperation.convertFromAvroRecordInstance((HoodieCompactionOperation)((Object)((Object)((Pair)p.getValue()).getValue()))))).collect(Collectors.toList());
        this.addPendingCompactionOperations(pendingOps.stream());
        Map<String, List<Pair>> partitionToFileGroups = pendingOps.stream().map(opPair -> {
            String compactionInstantTime = (String)opPair.getKey();
            HoodieFileGroup fileGroup = new HoodieFileGroup(((CompactionOperation)opPair.getValue()).getFileGroupId(), timeline);
            fileGroup.addNewFileSliceAtInstant(compactionInstantTime);
            return Pair.of(compactionInstantTime, fileGroup);
        }).collect(Collectors.groupingBy(x -> ((HoodieFileGroup)x.getValue()).getPartitionPath()));
        partitionToFileGroups.entrySet().forEach(entry -> {
            if (this.isPartitionAvailableInStore((String)entry.getKey())) {
                this.applyDeltaFileSlicesToPartitionView((String)entry.getKey(), ((List)entry.getValue()).stream().map(Pair::getValue).collect(Collectors.toList()), DeltaApplyMode.ADD);
            }
        });
    }

    private void addCommitInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Syncing committed instant (" + instant + ")"));
        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
        this.updatePartitionWriteFileGroups(commitMetadata.getPartitionToWriteStats(), timeline, instant);
        LOG.info((Object)("Done Syncing committed instant (" + instant + ")"));
    }

    private void updatePartitionWriteFileGroups(Map<String, List<HoodieWriteStat>> partitionToWriteStats, HoodieTimeline timeline, HoodieInstant instant) {
        partitionToWriteStats.entrySet().stream().forEach(entry -> {
            String partition = (String)entry.getKey();
            if (this.isPartitionAvailableInStore(partition)) {
                LOG.info((Object)("Syncing partition (" + partition + ") of instant (" + instant + ")"));
                FileStatus[] statuses = (FileStatus[])((List)entry.getValue()).stream().map(p -> {
                    FileStatus status = new FileStatus(p.getFileSizeInBytes(), false, 0, 0L, 0L, 0L, null, null, null, new Path(String.format("%s/%s", this.metaClient.getBasePath(), p.getPath())));
                    return status;
                }).toArray(FileStatus[]::new);
                List<HoodieFileGroup> fileGroups = this.buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
                this.applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.ADD);
            } else {
                LOG.warn((Object)("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded"));
            }
        });
        LOG.info((Object)("Done Syncing committed instant (" + instant + ")"));
    }

    private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Syncing restore instant (" + instant + ")"));
        HoodieRestoreMetadata metadata = TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
        Map<String, List<Pair>> partitionFiles = metadata.getHoodieRestoreMetadata().entrySet().stream().flatMap(entry -> ((List)entry.getValue()).stream().flatMap(e -> e.getPartitionMetadata().entrySet().stream().flatMap(e2 -> ((HoodieRollbackPartitionMetadata)((Object)((Object)((Object)((Object)e2.getValue()))))).getSuccessDeleteFiles().stream().map(x -> Pair.of(e2.getKey(), x))))).collect(Collectors.groupingBy(Pair::getKey));
        partitionFiles.entrySet().stream().forEach(e -> this.removeFileSlicesForPartition(timeline, instant, (String)e.getKey(), ((List)e.getValue()).stream().map(x -> (String)x.getValue()).collect(Collectors.toList())));
        if (metadata.getRestoreInstantInfo() != null) {
            Set<String> rolledbackInstants = metadata.getRestoreInstantInfo().stream().filter(instantInfo -> "replacecommit".equals(instantInfo.getAction())).map(instantInfo -> instantInfo.getCommitTime()).collect(Collectors.toSet());
            this.removeReplacedFileIdsAtInstants(rolledbackInstants);
        }
        LOG.info((Object)("Done Syncing restore instant (" + instant + ")"));
    }

    private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Syncing rollback instant (" + instant + ")"));
        HoodieRollbackMetadata metadata = TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
        metadata.getPartitionMetadata().entrySet().stream().forEach(e -> this.removeFileSlicesForPartition(timeline, instant, (String)e.getKey(), ((HoodieRollbackPartitionMetadata)((Object)((Object)e.getValue()))).getSuccessDeleteFiles()));
        LOG.info((Object)("Done Syncing rollback instant (" + instant + ")"));
    }

    private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Syncing replace instant (" + instant + ")"));
        HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
        this.updatePartitionWriteFileGroups(replaceMetadata.getPartitionToWriteStats(), timeline, instant);
        replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().forEach(entry -> {
            String partition = (String)entry.getKey();
            Map<HoodieFileGroupId, HoodieInstant> replacedFileIds = ((List)entry.getValue()).stream().collect(Collectors.toMap(replaceStat -> new HoodieFileGroupId(partition, (String)replaceStat), replaceStat -> instant));
            LOG.info((Object)("For partition (" + partition + ") of instant (" + instant + "), excluding " + replacedFileIds.size() + " file groups"));
            this.addReplacedFileGroups(replacedFileIds);
        });
        LOG.info((Object)("Done Syncing REPLACE instant (" + instant + ")"));
    }

    private void addCleanInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
        LOG.info((Object)("Syncing cleaner instant (" + instant + ")"));
        HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(this.metaClient, instant);
        cleanMetadata.getPartitionMetadata().entrySet().stream().forEach(entry -> {
            String basePath = this.metaClient.getBasePath();
            String partitionPath = ((HoodieCleanPartitionMetadata)((Object)((Object)entry.getValue()))).getPartitionPath();
            List<String> fullPathList = ((HoodieCleanPartitionMetadata)((Object)((Object)entry.getValue()))).getSuccessDeleteFiles().stream().map(fileName -> new Path(FSUtils.getPartitionPath(basePath, partitionPath), fileName).toString()).collect(Collectors.toList());
            this.removeFileSlicesForPartition(timeline, instant, (String)entry.getKey(), fullPathList);
        });
        LOG.info((Object)("Done Syncing cleaner instant (" + instant + ")"));
    }

    private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant instant, String partition, List<String> paths) {
        if (this.isPartitionAvailableInStore(partition)) {
            LOG.info((Object)("Removing file slices for partition (" + partition + ") for instant (" + instant + ")"));
            FileStatus[] statuses = (FileStatus[])paths.stream().map(p -> {
                FileStatus status = new FileStatus();
                status.setPath(new Path(p));
                return status;
            }).toArray(FileStatus[]::new);
            List<HoodieFileGroup> fileGroups = this.buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
            this.applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.REMOVE);
        } else {
            LOG.warn((Object)("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded"));
        }
    }

    protected void applyDeltaFileSlicesToPartitionView(String partition, List<HoodieFileGroup> deltaFileGroups, DeltaApplyMode mode) {
        if (deltaFileGroups.isEmpty()) {
            LOG.info((Object)("No delta file groups for partition :" + partition));
            return;
        }
        List fileGroups = this.fetchAllStoredFileGroups(partition).collect(Collectors.toList());
        Map<String, HoodieBaseFile> viewDataFiles = fileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices).map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get).map(df -> Pair.of(Path.getPathWithoutSchemeAndAuthority((Path)new Path(df.getPath())).toString(), df)).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Map<String, HoodieBaseFile> deltaDataFiles = deltaFileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices).map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get).map(df -> Pair.of(Path.getPathWithoutSchemeAndAuthority((Path)new Path(df.getPath())).toString(), df)).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Map<String, HoodieLogFile> viewLogFiles = fileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices).flatMap(FileSlice::getLogFiles).map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority((Path)lf.getPath()).toString(), lf)).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Map<String, HoodieLogFile> deltaLogFiles = deltaFileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices).flatMap(FileSlice::getLogFiles).map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority((Path)lf.getPath()).toString(), lf)).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        switch (mode) {
            case ADD: {
                viewDataFiles.putAll(deltaDataFiles);
                viewLogFiles.putAll(deltaLogFiles);
                break;
            }
            case REMOVE: {
                deltaDataFiles.keySet().stream().forEach(p -> {
                    HoodieBaseFile cfr_ignored_0 = (HoodieBaseFile)viewDataFiles.remove(p);
                });
                deltaLogFiles.keySet().stream().forEach(p -> {
                    HoodieLogFile cfr_ignored_0 = (HoodieLogFile)viewLogFiles.remove(p);
                });
                break;
            }
            default: {
                throw new IllegalStateException("Unknown diff apply mode=" + (Object)((Object)mode));
            }
        }
        HoodieTimeline timeline = deltaFileGroups.stream().map(df -> df.getTimeline()).findAny().get();
        List<HoodieFileGroup> fgs = this.buildFileGroups(viewDataFiles.values().stream(), viewLogFiles.values().stream(), timeline, true);
        this.storePartitionView(partition, fgs);
    }

    @Override
    public HoodieTimeline getTimeline() {
        return this.visibleActiveTimeline;
    }

    static enum DeltaApplyMode {
        ADD,
        REMOVE;

    }
}

