/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator;
import org.apache.hudi.table.action.compact.plan.generators.HoodieLogCompactionPlanGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseTableServicePlanActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
    private static final Logger LOG = LoggerFactory.getLogger(ScheduleCompactionActionExecutor.class);
    private final WriteOperationType operationType;
    private final Option<Map<String, String>> extraMetadata;
    private BaseHoodieCompactionPlanGenerator planGenerator;

    public ScheduleCompactionActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, Option<Map<String, String>> extraMetadata, WriteOperationType operationType) {
        super(context, config, table, instantTime);
        this.extraMetadata = extraMetadata;
        this.operationType = operationType;
        ValidationUtils.checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT, "Only COMPACT and LOG_COMPACT is supported");
        this.initPlanGenerator(context, config, table);
    }

    private void initPlanGenerator(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
        if (WriteOperationType.COMPACT.equals((Object)this.operationType)) {
            String planGeneratorClass = ConfigUtils.getStringWithAltKeys((Properties)config.getProps(), HoodieCompactionConfig.COMPACTION_PLAN_GENERATOR, true);
            this.planGenerator = this.createCompactionPlanGenerator(planGeneratorClass, table, context, config);
        } else {
            this.planGenerator = new HoodieLogCompactionPlanGenerator(table, context, config, this);
        }
    }

    @Override
    public Option<HoodieCompactionPlan> execute() {
        Option<HoodieInstant> earliestInflightOpt;
        ValidationUtils.checkArgument(this.table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "Can only compact table of type " + (Object)((Object)HoodieTableType.MERGE_ON_READ) + " and not " + this.table.getMetaClient().getTableType().name());
        if (!(this.table.getMetaClient().getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) || this.config.getWriteConcurrencyMode().supportsMultiWriter() || this.config.getFailedWritesCleanPolicy().isLazy() || this.config.getEngineType() != EngineType.SPARK || !(earliestInflightOpt = this.table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction().firstInstant()).isPresent() || InstantComparison.compareTimestamps(earliestInflightOpt.get().requestedTime(), InstantComparison.GREATER_THAN, this.instantTime))) {
            LOG.warn("Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflightOpt.get() + ", Compaction scheduled at " + this.instantTime + ". Hence skipping to schedule compaction");
            return Option.empty();
        }
        HoodieCompactionPlan plan = this.scheduleCompaction();
        Option<HoodieCompactionPlan> option = Option.empty();
        if (plan != null && CollectionUtils.nonEmpty(plan.getOperations())) {
            this.extraMetadata.ifPresent(plan::setExtraMetadata);
            if (this.operationType.equals((Object)WriteOperationType.COMPACT)) {
                HoodieInstant compactionInstant = this.instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, "compaction", this.instantTime);
                this.table.getActiveTimeline().saveToCompactionRequested(compactionInstant, plan);
            } else {
                HoodieInstant logCompactionInstant = this.instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, "logcompaction", this.instantTime);
                this.table.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant, plan);
            }
            option = Option.of(plan);
        }
        return option;
    }

    @Nullable
    private HoodieCompactionPlan scheduleCompaction() {
        LOG.info("Checking if compaction needs to be run on " + this.config.getBasePath());
        boolean compactable = this.needCompact(this.config.getInlineCompactTriggerStrategy());
        if (compactable) {
            LOG.info("Generating compaction plan for merge on read table " + this.config.getBasePath());
            try {
                this.context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
                return this.planGenerator.generateCompactionPlan(this.instantTime);
            }
            catch (IOException e) {
                throw new HoodieCompactionException("Could not schedule compaction " + this.config.getBasePath(), e);
            }
        }
        return new HoodieCompactionPlan();
    }

    private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
        Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo = CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(this.table.getActiveTimeline());
        if (deltaCommitsInfo.isPresent()) {
            return Option.of(Pair.of(deltaCommitsInfo.get().getLeft().countInstants(), deltaCommitsInfo.get().getRight().requestedTime()));
        }
        return Option.empty();
    }

    private Option<Pair<Integer, String>> getLatestDeltaCommitInfoSinceLastCompactionRequest() {
        Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo = CompactionUtils.getDeltaCommitsSinceLatestCompactionRequest(this.table.getActiveTimeline());
        if (deltaCommitsInfo.isPresent()) {
            return Option.of(Pair.of(deltaCommitsInfo.get().getLeft().countInstants(), deltaCommitsInfo.get().getRight().requestedTime()));
        }
        return Option.empty();
    }

    private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
        boolean compactable;
        Option<Pair<Integer, String>> latestDeltaCommitInfoOption = this.getLatestDeltaCommitInfo();
        if (!latestDeltaCommitInfoOption.isPresent()) {
            return false;
        }
        Pair<Integer, String> latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
        if (WriteOperationType.LOG_COMPACT.equals((Object)this.operationType)) {
            return true;
        }
        int inlineCompactDeltaCommitMax = this.config.getInlineCompactDeltaCommitMax();
        int inlineCompactDeltaSecondsMax = this.config.getInlineCompactDeltaSecondsMax();
        switch (compactionTriggerStrategy) {
            case NUM_COMMITS: {
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
                if (!compactable) break;
                LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
                break;
            }
            case NUM_COMMITS_AFTER_LAST_REQUEST: {
                latestDeltaCommitInfoOption = this.getLatestDeltaCommitInfoSinceLastCompactionRequest();
                if (!latestDeltaCommitInfoOption.isPresent()) {
                    return false;
                }
                latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
                if (!compactable) break;
                LOG.info(String.format("The delta commits >= %s since the last compaction request, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
                break;
            }
            case TIME_ELAPSED: {
                boolean bl = compactable = (long)inlineCompactDeltaSecondsMax <= this.parsedToSeconds(this.instantTime) - this.parsedToSeconds(latestDeltaCommitInfo.getRight());
                if (!compactable) break;
                LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax));
                break;
            }
            case NUM_OR_TIME: {
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() || (long)inlineCompactDeltaSecondsMax <= this.parsedToSeconds(this.instantTime) - this.parsedToSeconds(latestDeltaCommitInfo.getRight());
                if (!compactable) break;
                LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, inlineCompactDeltaSecondsMax));
                break;
            }
            case NUM_AND_TIME: {
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() && (long)inlineCompactDeltaSecondsMax <= this.parsedToSeconds(this.instantTime) - this.parsedToSeconds(latestDeltaCommitInfo.getRight());
                if (!compactable) break;
                LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, inlineCompactDeltaSecondsMax));
                break;
            }
            default: {
                throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + (Object)((Object)this.config.getInlineCompactTriggerStrategy()));
            }
        }
        return compactable;
    }

    private Long parsedToSeconds(String time) {
        return TimelineUtils.parseDateFromInstantTimeSafely(time).orElseThrow(() -> new HoodieCompactionException("Failed to parse timestamp " + time)).getTime() / 1000L;
    }

    private BaseHoodieCompactionPlanGenerator createCompactionPlanGenerator(String planGeneratorClass, HoodieTable table, HoodieEngineContext context, HoodieWriteConfig config) {
        return (BaseHoodieCompactionPlanGenerator)ReflectionUtils.loadClass(planGeneratorClass, new Class[]{HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class, BaseTableServicePlanActionExecutor.class}, new Object[]{table, context, config, this});
    }
}

