/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Objects;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;

public class IncrSourceHelper {
    private static final String DEFAULT_BEGIN_TIMESTAMP = "000";

    private static String getStrictlyLowerTimestamp(String timestamp) {
        long ts = Long.parseLong(timestamp);
        ValidationUtils.checkArgument(ts > 0L, "Timestamp must be positive");
        long lower = ts - 1L;
        return "" + lower;
    }

    public static Pair<String, Pair<String, String>> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, int numInstantsPerFetch, Option<String> beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) {
        ValidationUtils.checkArgument(numInstantsPerFetch > 0, "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
        HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
        Option<HoodieInstant> firstIncompleteCommit = srcMetaClient.getCommitsTimeline().filterInflightsAndRequested().filter(instant -> !"replacecommit".equals(instant.getAction()) || !ClusteringUtils.getClusteringPlan(srcMetaClient, instant).isPresent()).firstInstant();
        HoodieTimeline completedCommitTimeline = srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
        HoodieTimeline activeCommitTimeline = firstIncompleteCommit.map(commit -> completedCommitTimeline.findInstantsBefore(commit.getTimestamp())).orElse(completedCommitTimeline);
        String beginInstantTime = beginInstant.orElseGet(() -> {
            if (missingCheckpointStrategy != null) {
                if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) {
                    Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
                    return lastInstant.map(hoodieInstant -> IncrSourceHelper.getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse(DEFAULT_BEGIN_TIMESTAMP);
                }
                return DEFAULT_BEGIN_TIMESTAMP;
            }
            throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest committed instant set hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy to a valid value");
        });
        if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || !activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
            Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
            return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
        }
        Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
        return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), Pair.of(beginInstantTime, lastInstant.get().getTimestamp()));
    }

    public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) {
        Objects.requireNonNull(instantTime);
        ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN, sinceInstant), "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between " + sinceInstant + "(excl) - " + endInstant + "(incl)");
        ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant), "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between " + sinceInstant + "(excl) - " + endInstant + "(incl)");
    }

    public static enum MissingCheckpointStrategy {
        READ_LATEST,
        READ_UPTO_LATEST_COMMIT;

    }
}

