/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class KafkaOffsetGen {
    private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
    private final HashMap<String, Object> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;

    public KafkaOffsetGen(TypedProperties props) {
        this.props = props;
        this.kafkaParams = new HashMap();
        for (Object prop : props.keySet()) {
            this.kafkaParams.put(prop.toString(), props.get(prop.toString()));
        }
        DataSourceUtils.checkRequiredProperties(props, Collections.singletonList("hoodie.deltastreamer.source.kafka.topic"));
        this.topicName = props.getString("hoodie.deltastreamer.source.kafka.topic");
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
        long numEvents;
        Map<TopicPartition, Long> toOffsets;
        Map<TopicPartition, Long> fromOffsets;
        try (KafkaConsumer consumer2 = new KafkaConsumer(this.kafkaParams);){
            if (!this.checkTopicExists(consumer2)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            List<PartitionInfo> partitionInfoList = consumer2.partitionsFor(this.topicName);
            Set<TopicPartition> topicPartitions = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
            if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) {
                fromOffsets = this.checkupValidOffsets(consumer2, lastCheckpointStr, topicPartitions);
                metrics.updateDeltaStreamerKafkaDelayCountMetrics(this.delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer2));
            } else {
                KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf(this.props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
                switch (autoResetValue) {
                    case EARLIEST: {
                        fromOffsets = consumer2.beginningOffsets(topicPartitions);
                        break;
                    }
                    case LATEST: {
                        fromOffsets = consumer2.endOffsets(topicPartitions);
                        break;
                    }
                    default: {
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
                    }
                }
            }
            toOffsets = consumer2.endOffsets(topicPartitions);
        }
        long maxEventsToReadFromKafka = this.props.getLong("hoodie.deltastreamer.kafka.source.maxEvents", Config.maxEventsFromKafkaSource);
        if (sourceLimit == Long.MAX_VALUE) {
            numEvents = maxEventsToReadFromKafka;
            LOG.info((Object)("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka));
        } else {
            numEvents = sourceLimit;
        }
        if (numEvents < (long)toOffsets.size()) {
            throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
        }
        return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
    }

    private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer2, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
        HashMap<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        Map<TopicPartition, Long> earliestOffsets = consumer2.beginningOffsets(topicPartitions);
        boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream().anyMatch(offset -> (Long)offset.getValue() < (Long)earliestOffsets.get(offset.getKey()));
        return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
    }

    private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer2) {
        Long delayCount = 0L;
        HashMap<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        Map<TopicPartition, Long> lastOffsets = consumer2.endOffsets(topicPartitions);
        for (Map.Entry<TopicPartition, Long> entry : lastOffsets.entrySet()) {
            Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
            delayCount = delayCount + (entry.getValue() - offect > 0L ? entry.getValue() - offect : 0L);
        }
        return delayCount;
    }

    public boolean checkTopicExists(KafkaConsumer consumer2) {
        Map<String, List<PartitionInfo>> result = consumer2.listTopics();
        return result.containsKey(this.topicName);
    }

    public String getTopicName() {
        return this.topicName;
    }

    public HashMap<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }

    public static class Config {
        private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
        private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
        private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
        public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000L;
        public static long maxEventsFromKafkaSource = 5000000L;
    }

    static enum KafkaResetOffsetStrategies {
        LATEST,
        EARLIEST;

    }

    public static class CheckpointUtils {
        public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) {
            HashMap<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
            String[] splits = checkpointStr.split(",");
            String topic = splits[0];
            for (int i = 1; i < splits.length; ++i) {
                String[] subSplits = splits[i].split(":");
                offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
            }
            return offsetMap;
        }

        public static String offsetsToStr(OffsetRange[] ranges) {
            StringBuilder sb = new StringBuilder();
            sb.append(ranges[0].topic() + ",");
            sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, Map<TopicPartition, Long> toOffsetMap, long numEvents) {
            Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
            OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
            toOffsetMap.keySet().stream().map(tp -> {
                long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
                return OffsetRange.create(tp, fromOffset, fromOffset);
            }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
            long allocedEvents = 0L;
            HashSet<Integer> exhaustedPartitions = new HashSet<Integer>();
            while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
                long remainingEvents = numEvents - allocedEvents;
                long eventsPerPartition = (long)Math.ceil(1.0 * (double)remainingEvents / (double)(toOffsetMap.size() - exhaustedPartitions.size()));
                for (int i = 0; i < ranges.length; ++i) {
                    OffsetRange range = ranges[i];
                    if (exhaustedPartitions.contains(range.partition())) continue;
                    long toOffsetMax = toOffsetMap.get(range.topicPartition());
                    long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
                    if (toOffset == toOffsetMax) {
                        exhaustedPartitions.add(range.partition());
                    }
                    if ((allocedEvents += toOffset - range.untilOffset()) > numEvents) {
                        long offsetsToAdd = Math.min(eventsPerPartition, numEvents - allocedEvents);
                        toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
                    }
                    ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset);
                }
            }
            return ranges;
        }

        public static long totalNewMessages(OffsetRange[] ranges) {
            return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
        }
    }
}

