/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.timeline.versioning.v1;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.ArchivalUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v1.ActiveTimelineV1;
import org.apache.hudi.common.table.timeline.versioning.v1.ArchivedTimelineV1;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantFileNameGeneratorV1;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimelineArchiverV1<T extends HoodieAvroPayload, I, K, O>
implements HoodieTimelineArchiver<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(TimelineArchiverV1.class);
    private final StoragePath archiveFilePath;
    private final HoodieWriteConfig config;
    private HoodieLogFormat.Writer writer;
    private final int maxInstantsToKeep;
    private final int minInstantsToKeep;
    private final HoodieTable<T, I, K, O> table;
    private final HoodieTableMetaClient metaClient;
    private final TransactionManager txnManager;

    public TimelineArchiverV1(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
        this.config = config;
        this.table = table;
        this.metaClient = table.getMetaClient();
        this.archiveFilePath = ArchivedTimelineV1.getArchiveLogPath(this.metaClient.getArchivePath());
        this.txnManager = new TransactionManager(config, table.getMetaClient().getStorage());
        Pair<Integer, Integer> minAndMaxInstants = ArchivalUtils.getMinAndMaxInstantsToKeep(table, this.metaClient);
        this.minInstantsToKeep = minAndMaxInstants.getLeft();
        this.maxInstantsToKeep = minAndMaxInstants.getRight();
    }

    private HoodieLogFormat.Writer openWriter(StoragePath archivePath) {
        try {
            if (this.writer == null) {
                return HoodieLogFormat.newWriterBuilder().onParentPath(archivePath).withInstantTime("").withFileId(this.archiveFilePath.getName()).withFileExtension(".archive").withStorage(this.metaClient.getStorage()).build();
            }
            return this.writer;
        }
        catch (IOException e) {
            throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
        }
    }

    private void close() {
        try {
            if (this.writer != null) {
                this.writer.close();
            }
        }
        catch (IOException e) {
            throw new HoodieException("Unable to close HoodieLogFormat writer", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
        try {
            List<HoodieInstant> instantsToArchive;
            if (acquireLock) {
                this.txnManager.beginTransaction(Option.empty(), Option.empty());
            }
            if (!(instantsToArchive = this.getInstantsToArchive()).isEmpty()) {
                this.writer = this.openWriter(this.archiveFilePath.getParent());
                LOG.info("Archiving instants {} for table {}", instantsToArchive, (Object)this.config.getBasePath());
                this.archive(context, instantsToArchive);
                LOG.info("Deleting archived instants {} for table {}", instantsToArchive, (Object)this.config.getBasePath());
                this.deleteArchivedInstants(instantsToArchive, context);
            } else {
                LOG.info("No Instants to archive for table {}", (Object)this.config.getBasePath());
            }
            int n = instantsToArchive.size();
            return n;
        }
        finally {
            this.close();
            if (acquireLock) {
                this.txnManager.endTransaction(Option.empty());
            }
        }
    }

    public void flushArchiveEntries(List<IndexedRecord> archiveRecords, StoragePath archivePath) throws HoodieCommitException {
        try {
            Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
            this.writer = this.openWriter(archivePath);
            this.writeToFile(wrapperSchema, archiveRecords);
        }
        catch (Exception e) {
            throw new HoodieCommitException("Failed to archive commits", e);
        }
        finally {
            this.close();
        }
    }

    private Stream<HoodieInstant> getCleanInstantsToArchive() {
        HoodieTimeline cleanAndRollbackTimeline = this.table.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet("clean", "rollback")).filterCompletedInstants();
        return cleanAndRollbackTimeline.getInstantsAsStream().collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
            if (hoodieInstants.size() > this.maxInstantsToKeep) {
                return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep);
            }
            return new ArrayList();
        }).flatMap(Collection::stream);
    }

    private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
        Option<HoodieInstant> completedCommitBeforeOldestPendingInstant;
        HoodieTimeline commitTimeline = this.table.getCompletedCommitsTimeline();
        Option<HoodieInstant> oldestPendingInstant = this.table.getActiveTimeline().getWriteTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
        Option<Object> oldestCommitToRetain = oldestPendingInstant.isPresent() ? (!(completedCommitBeforeOldestPendingInstant = Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants().filter(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.LESSER_THAN, ((HoodieInstant)oldestPendingInstant.get()).requestedTime())).findFirst())).isPresent() || InstantComparison.compareTimestamps(oldestPendingInstant.get().requestedTime(), InstantComparison.LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().requestedTime()) ? oldestPendingInstant : completedCommitBeforeOldestPendingInstant) : Option.empty();
        Option<HoodieInstant> firstSavepoint = this.table.getCompletedSavepointTimeline().firstInstant();
        Set<String> savepointTimestamps = this.table.getSavepointTimestamps();
        if (!commitTimeline.empty() && commitTimeline.countInstants() > this.maxInstantsToKeep) {
            Option oldestInstantToRetainForCompaction = this.metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && (this.config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS || this.config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME) ? CompactionUtils.getEarliestInstantToRetainForCompaction(this.table.getActiveTimeline(), this.config.getInlineCompactDeltaCommitMax()) : Option.empty();
            Option<HoodieInstant> oldestInstantToRetainForClustering = ClusteringUtils.getEarliestInstantToRetainForClustering(this.table.getActiveTimeline(), this.table.getMetaClient(), this.config.getCleanerPolicy());
            Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstantsAsStream().filter(s -> {
                if (this.config.shouldArchiveBeyondSavepoint()) {
                    return !savepointTimestamps.contains(s.requestedTime());
                }
                return !firstSavepoint.isPresent() || !InstantComparison.compareTimestamps(((HoodieInstant)firstSavepoint.get()).requestedTime(), InstantComparison.LESSER_THAN_OR_EQUALS, s.requestedTime());
            }).filter(s -> oldestCommitToRetain.map(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.GREATER_THAN, s.requestedTime())).orElse(true)).filter(s -> oldestInstantToRetainForCompaction.map(instantToRetain -> InstantComparison.compareTimestamps(s.requestedTime(), InstantComparison.LESSER_THAN, instantToRetain.requestedTime())).orElse(true)).filter(s -> oldestInstantToRetainForClustering.map(instantToRetain -> InstantComparison.compareTimestamps(s.requestedTime(), InstantComparison.LESSER_THAN, instantToRetain.requestedTime())).orElse(true));
            return instantToArchiveStream.limit(commitTimeline.countInstants() - this.minInstantsToKeep);
        }
        return Stream.empty();
    }

    private List<HoodieInstant> getInstantsToArchive() throws IOException {
        List instantsToArchive;
        HoodieTableMetaClient dataMetaClient;
        Option<HoodieInstant> qualifiedEarliestInstant;
        if (this.config.isMetaserverEnabled()) {
            return Collections.emptyList();
        }
        List candidates = Stream.concat(this.getCleanInstantsToArchive(), this.getCommitInstantsToArchive()).collect(Collectors.toList());
        if (candidates.isEmpty()) {
            return Collections.emptyList();
        }
        Stream<Object> instants = candidates.stream();
        if (this.config.isMetadataTableEnabled() && this.table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
            try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(this.table.getContext(), this.table.getStorage(), this.config.getMetadataConfig(), this.config.getBasePath());){
                Option<String> latestCompactionTime = tableMetadata.getLatestCompactionTime();
                if (!latestCompactionTime.isPresent()) {
                    LOG.info("Not archiving as there is no compaction yet on the metadata table");
                    instants = Stream.empty();
                } else {
                    LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get());
                    instants = instants.filter(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.LESSER_THAN, (String)latestCompactionTime.get()));
                }
            }
            catch (Exception e) {
                throw new HoodieException("Error limiting instant archival based on metadata table", e);
            }
        }
        if (this.table.isMetadataTable() && (qualifiedEarliestInstant = TimelineUtils.getEarliestInstantForMetadataArchival((dataMetaClient = HoodieTableMetaClient.builder().setBasePath(HoodieTableMetadata.getDatasetBasePath(this.config.getBasePath())).setConf(this.metaClient.getStorageConf()).build()).getActiveTimeline(), this.config.shouldArchiveBeyondSavepoint())).isPresent()) {
            instants = instants.filter(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.LESSER_THAN, ((HoodieInstant)qualifiedEarliestInstant.get()).requestedTime()));
        }
        if ((instantsToArchive = instants.collect(Collectors.toList())).isEmpty()) {
            return Collections.emptyList();
        }
        ActiveTimelineV1 rawActiveTimeline = new ActiveTimelineV1(this.metaClient, false);
        Map<Pair, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstantsAsStream().collect(Collectors.groupingBy(i -> Pair.of(i.requestedTime(), InstantComparatorV1.getComparableAction(i.getAction()))));
        return instantsToArchive.stream().flatMap(hoodieInstant -> groupByTsAction.getOrDefault(Pair.of(hoodieInstant.requestedTime(), InstantComparatorV1.getComparableAction(hoodieInstant.getAction())), Collections.emptyList()).stream()).collect(Collectors.toList());
    }

    private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
        LOG.info("Deleting instants " + archivedInstants);
        ArrayList<HoodieInstant> pendingInstants = new ArrayList<HoodieInstant>();
        ArrayList<HoodieInstant> completedInstants = new ArrayList<HoodieInstant>();
        for (HoodieInstant instant2 : archivedInstants) {
            if (instant2.isCompleted()) {
                completedInstants.add(instant2);
                continue;
            }
            pendingInstants.add(instant2);
        }
        context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + this.config.getTableName());
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        if (!pendingInstants.isEmpty()) {
            context.foreach(pendingInstants, instant -> activeTimeline.deleteInstantFileIfExists((HoodieInstant)instant), Math.min(pendingInstants.size(), this.config.getArchiveDeleteParallelism()));
        }
        if (!completedInstants.isEmpty()) {
            completedInstants.stream().forEach(instant -> activeTimeline.deleteInstantFileIfExists((HoodieInstant)instant));
        }
        return true;
    }

    public void archive(HoodieEngineContext context, List<HoodieInstant> instants) throws HoodieCommitException {
        try {
            Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
            LOG.info("Wrapper schema " + wrapperSchema.toString());
            ArrayList<IndexedRecord> records = new ArrayList<IndexedRecord>();
            for (HoodieInstant hoodieInstant : instants) {
                try {
                    this.deleteAnyLeftOverMarkers(context, hoodieInstant);
                    records.add(this.convertToAvroRecord(hoodieInstant));
                    if (records.size() < this.config.getCommitArchivalBatchSize()) continue;
                    this.writeToFile(wrapperSchema, records);
                }
                catch (Exception e) {
                    InstantFileNameGeneratorV1 fileNameFactory = new InstantFileNameGeneratorV1();
                    LOG.error("Failed to archive commits, .commit file: " + fileNameFactory.getFileName(hoodieInstant), (Throwable)e);
                    if (!this.config.isFailOnTimelineArchivingEnabled()) continue;
                    throw e;
                }
            }
            this.writeToFile(wrapperSchema, records);
        }
        catch (Exception e) {
            throw new HoodieCommitException("Failed to archive commits", e);
        }
    }

    private void deleteAnyLeftOverMarkers(HoodieEngineContext context, HoodieInstant instant) {
        WriteMarkers writeMarkers = WriteMarkersFactory.get(this.config.getMarkersType(), this.table, instant.requestedTime());
        if (writeMarkers.deleteMarkerDir(context, this.config.getMarkersDeleteParallelism())) {
            LOG.info("Cleaned up left over marker directory for instant :" + instant);
        }
    }

    private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
        if (records.size() > 0) {
            HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
            String keyField = this.table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
            List<HoodieRecord> indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
            HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
            this.writer.appendBlock(block);
            records.clear();
        }
    }

    private IndexedRecord convertToAvroRecord(HoodieInstant hoodieInstant) throws IOException {
        return MetadataConversionUtils.createMetaWrapper(hoodieInstant, this.metaClient);
    }
}

