/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamerCheckpointUtils {
    private static final Logger LOG = LoggerFactory.getLogger(StreamerCheckpointUtils.class);

    public static Option<Checkpoint> resolveCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt, HoodieStreamer.Config streamerConfig, TypedProperties props, HoodieTableMetaClient metaClient) throws IOException {
        Option<Checkpoint> checkpoint = Option.empty();
        StreamerCheckpointUtils.assertNoCheckpointOverrideDuringUpgradeForHoodieIncSource(metaClient, streamerConfig, props);
        if (commitsTimelineOpt.isPresent()) {
            checkpoint = StreamerCheckpointUtils.resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), streamerConfig, props);
        }
        checkpoint = StreamerCheckpointUtils.useCkpFromOverrideConfigIfAny(streamerConfig, props, checkpoint);
        return checkpoint;
    }

    @VisibleForTesting
    static void assertNoCheckpointOverrideDuringUpgradeForHoodieIncSource(HoodieTableMetaClient metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
        boolean hasCheckpointOverride = !StringUtils.isNullOrEmpty(streamerConfig.checkpoint) || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint);
        boolean isHoodieIncSource = CheckpointUtils.HOODIE_INCREMENTAL_SOURCES.contains(streamerConfig.sourceClassName);
        if (hasCheckpointOverride && isHoodieIncSource) {
            HoodieTableVersion writeTableVersion = HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION));
            HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
            if (config.autoUpgrade() && UpgradeDowngrade.needsUpgradeOrDowngrade(metaClient, config, writeTableVersion)) {
                throw new HoodieUpgradeDowngradeException(String.format("When upgrade/downgrade is happening, please avoid setting --checkpoint option and --ignore-checkpoint for your delta streamers. Detected invalid streamer configuration:\n%s", streamerConfig));
            }
        }
    }

    private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(HoodieStreamer.Config streamerConfig, TypedProperties props, Option<Checkpoint> checkpoint) {
        LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
        if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
            int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
            checkpoint = Option.of(CheckpointUtils.buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, writeTableVersion, streamerConfig.checkpoint));
        }
        return checkpoint;
    }

    @VisibleForTesting
    static Option<Checkpoint> resolveCheckpointBetweenConfigAndPrevCommit(HoodieTimeline commitsTimeline, HoodieStreamer.Config streamerConfig, TypedProperties props) throws IOException {
        Option<HoodieInstant> lastCommit;
        HoodieTimeline deltaCommitTimeline;
        Option<Checkpoint> resumeCheckpoint = Option.empty();
        if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name()) && !(deltaCommitTimeline = commitsTimeline.filter(instant -> instant.getAction().equals("deltacommit"))).empty()) {
            commitsTimeline = deltaCommitTimeline;
        }
        if ((lastCommit = commitsTimeline.lastInstant()).isPresent()) {
            Option<HoodieCommitMetadata> commitMetadataOption = StreamerCheckpointUtils.getLatestCommitMetadataWithValidCheckpointInfo(commitsTimeline);
            int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
            if (commitMetadataOption.isPresent()) {
                HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
                Checkpoint checkpointFromCommit = CheckpointUtils.getCheckpoint(commitMetadata);
                LOG.debug("Checkpoint reset from metadata: " + checkpointFromCommit.getCheckpointResetKey());
                if (StreamerCheckpointUtils.ignoreCkpCfgPrevailsOverCkpFromPrevCommit(streamerConfig, checkpointFromCommit)) {
                    resumeCheckpoint = Option.empty();
                } else if (StreamerCheckpointUtils.ckpOverrideCfgPrevailsOverCkpFromPrevCommit(streamerConfig, checkpointFromCommit)) {
                    resumeCheckpoint = Option.of(CheckpointUtils.buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, writeTableVersion, streamerConfig.checkpoint));
                } else if (StreamerCheckpointUtils.shouldUseCkpFromPrevCommit(checkpointFromCommit)) {
                    resumeCheckpoint = Option.of(checkpointFromCommit);
                } else if (InstantComparison.compareTimestamps("00000000000002", InstantComparison.LESSER_THAN, lastCommit.get().requestedTime())) {
                    throw new HoodieStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + commitsTimeline.getInstants());
                }
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    ConfigUtils.removeConfigFromProps(props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
                }
            } else if (streamerConfig.checkpoint != null) {
                resumeCheckpoint = Option.of(CheckpointUtils.buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, writeTableVersion, streamerConfig.checkpoint));
            }
        }
        return resumeCheckpoint;
    }

    private static boolean shouldUseCkpFromPrevCommit(Checkpoint checkpointFromCommit) {
        return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey());
    }

    private static boolean ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config streamerConfig, Checkpoint checkpointFromCommit) {
        return streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey()) || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()));
    }

    private static boolean ignoreCkpCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config streamerConfig, Checkpoint checkpointFromCommit) {
        return streamerConfig.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointIgnoreKey()) || !streamerConfig.ignoreCheckpoint.equals(checkpointFromCommit.getCheckpointIgnoreKey()));
    }

    public static Option<Pair<String, HoodieCommitMetadata>> getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return timeline.getReverseOrderedInstants().map(instant -> {
            try {
                HoodieCommitMetadata commitMetadata = timeline.readCommitMetadata((HoodieInstant)instant);
                if (!(StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key")) && StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")) && StringUtils.isNullOrEmpty(commitMetadata.getMetadata("streamer.checkpoint.key.v2")) && StringUtils.isNullOrEmpty(commitMetadata.getMetadata("streamer.checkpoint.reset.key.v2")))) {
                    return Option.of(Pair.of(instant.toString(), commitMetadata));
                }
                return Option.empty();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
            }
        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
    }

    public static Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
        return StreamerCheckpointUtils.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> (HoodieCommitMetadata)pair.getRight());
    }

    public static Option<String> getLatestInstantWithValidCheckpointInfo(Option<HoodieTimeline> timelineOpt) {
        return timelineOpt.map(timeline -> {
            try {
                return StreamerCheckpointUtils.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(timeline).map(pair -> (String)pair.getLeft());
            }
            catch (IOException e) {
                throw new HoodieIOException("failed to get latest instant with ValidCheckpointInfo", e);
            }
        }).orElse(Option.empty());
    }
}

