/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetGen {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetGen.class);
    private static final String METRIC_NAME_KAFKA_DELAY_COUNT = "kafkaDelayCount";
    private static final Comparator<OffsetRange> SORT_BY_PARTITION = Comparator.comparing(OffsetRange::partition);
    public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
    private final Pattern pattern = Pattern.compile(".*,.*:.*");
    private final Map<String, Object> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;
    private KafkaSourceConfig.KafkaResetOffsetStrategies autoResetValue;
    private final String kafkaCheckpointType;

    public KafkaOffsetGen(TypedProperties props) {
        this.props = props;
        this.kafkaParams = this.excludeHoodieConfigs(props);
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(KafkaSourceConfig.KAFKA_TOPIC_NAME));
        this.topicName = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_TOPIC_NAME);
        this.kafkaCheckpointType = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE, true);
        String kafkaAutoResetOffsetsStr = props.getString(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key(), KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
        boolean found = false;
        for (KafkaSourceConfig.KafkaResetOffsetStrategies entry : KafkaSourceConfig.KafkaResetOffsetStrategies.values()) {
            if (!entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) continue;
            found = true;
            this.autoResetValue = entry;
            break;
        }
        if (!found) {
            throw new HoodieStreamerException(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key() + " config set to unknown value " + kafkaAutoResetOffsetsStr);
        }
        if (this.autoResetValue.equals((Object)KafkaSourceConfig.KafkaResetOffsetStrategies.GROUP)) {
            this.kafkaParams.put(KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.key(), KafkaSourceConfig.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
        }
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieIngestionMetrics metrics) {
        long numEvents;
        Map toOffsets;
        Map<TopicPartition, Long> fromOffsets;
        try (KafkaConsumer consumer = new KafkaConsumer(this.kafkaParams);){
            if (!this.checkTopicExists(consumer)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            List<PartitionInfo> partitionInfoList = this.fetchPartitionInfos(consumer, this.topicName);
            Set<TopicPartition> topicPartitions = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
            if (KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(this.kafkaCheckpointType) && this.isValidTimestampCheckpointType(lastCheckpointStr).booleanValue()) {
                lastCheckpointStr = this.getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, this.topicName, Long.parseLong(lastCheckpointStr.get()));
            }
            if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && this.checkTopicCheckpoint(lastCheckpointStr)) {
                fromOffsets = this.fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
                metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, this.delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
            } else {
                switch (this.autoResetValue) {
                    case EARLIEST: {
                        fromOffsets = consumer.beginningOffsets(topicPartitions);
                        break;
                    }
                    case LATEST: {
                        fromOffsets = consumer.endOffsets(topicPartitions);
                        break;
                    }
                    case GROUP: {
                        fromOffsets = this.getGroupOffsets(consumer, topicPartitions);
                        break;
                    }
                    default: {
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
                    }
                }
            }
            toOffsets = consumer.endOffsets(topicPartitions);
        }
        long maxEventsToReadFromKafka = ConfigUtils.getLongWithAltKeys(this.props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
        if (sourceLimit == Long.MAX_VALUE) {
            numEvents = maxEventsToReadFromKafka;
            LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka);
        } else {
            numEvents = sourceLimit;
        }
        if (numEvents < (long)toOffsets.size()) {
            throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
        }
        long minPartitions = ConfigUtils.getLongWithAltKeys(this.props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
        LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
        return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions);
    }

    private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String topicName) {
        List partitionInfos;
        long timeout = ConfigUtils.getLongWithAltKeys(this.props, KafkaSourceConfig.KAFKA_FETCH_PARTITION_TIME_OUT);
        long start2 = System.currentTimeMillis();
        do {
            partitionInfos = consumer.partitionsFor(topicName);
            try {
                if (partitionInfos != null) continue;
                TimeUnit.SECONDS.sleep(10L);
            }
            catch (InterruptedException e) {
                LOG.error("Sleep failed while fetching partitions");
            }
        } while (partitionInfos == null && System.currentTimeMillis() <= start2 + timeout);
        if (partitionInfos == null) {
            throw new HoodieStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", topicName));
        }
        return partitionInfos;
    }

    private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
        Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream().anyMatch(offset -> (Long)offset.getValue() < (Long)earliestOffsets.get(offset.getKey()));
        if (isCheckpointOutOfBounds) {
            if (ConfigUtils.getBooleanWithAltKeys(this.props, KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
                throw new HoodieStreamerException("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.");
            }
            LOG.warn("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want Hudi Streamer to fail on such cases, set \"" + KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
        }
        return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets;
    }

    private Boolean isValidTimestampCheckpointType(Option<String> lastCheckpointStr) {
        if (!lastCheckpointStr.isPresent()) {
            return false;
        }
        Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
        Matcher isNum = pattern.matcher(lastCheckpointStr.get());
        return isNum.matches() && (lastCheckpointStr.get().length() == 13 || lastCheckpointStr.get().length() == 10);
    }

    private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
        Long delayCount = 0L;
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        Map lastOffsets = consumer.endOffsets(topicPartitions);
        for (Map.Entry entry : lastOffsets.entrySet()) {
            Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
            delayCount = delayCount + ((Long)entry.getValue() - offect > 0L ? (Long)entry.getValue() - offect : 0L);
        }
        return delayCount;
    }

    private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions, String topicName, Long timestamp) {
        Map topicPartitionsTimestamp = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toMap(Function.identity(), x -> timestamp));
        Map earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp);
        StringBuilder sb = new StringBuilder();
        sb.append(topicName + ",");
        for (Map.Entry map : offsetAndTimestamp.entrySet()) {
            if (map.getValue() != null) {
                sb.append(((TopicPartition)map.getKey()).partition()).append(":").append(((OffsetAndTimestamp)map.getValue()).offset()).append(",");
                continue;
            }
            sb.append(((TopicPartition)map.getKey()).partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
        }
        return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
    }

    public boolean checkTopicExists(KafkaConsumer consumer) {
        Map result = consumer.listTopics();
        return result.containsKey(this.topicName);
    }

    private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
        Matcher matcher = this.pattern.matcher(lastCheckpointStr.get());
        return matcher.matches();
    }

    public String getTopicName() {
        return this.topicName;
    }

    public Map<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }

    private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        props.keySet().stream().filter(prop -> !prop.toString().startsWith("hoodie.") || prop.toString().startsWith("hoodie.streamer.source.kafka.value.deserializer.") || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer.")).forEach(prop -> kafkaParams.put(prop.toString(), props.get(prop.toString())));
        return kafkaParams;
    }

    public void commitOffsetToKafka(String checkpointStr) {
        ConfigUtils.checkRequiredProperties(this.props, Collections.singletonList("group.id"));
        Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
        HashMap offsetAndMetadataMap = new HashMap(offsetMap.size());
        try (KafkaConsumer consumer = new KafkaConsumer(this.kafkaParams);){
            offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset.longValue())));
            consumer.commitSync(offsetAndMetadataMap);
        }
        catch (CommitFailedException | TimeoutException e) {
            LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
        }
    }

    private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, Set<TopicPartition> topicPartitions) {
        Map<Object, Long> fromOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : topicPartitions) {
            OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition);
            if (committedOffsetAndMetadata != null) {
                fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
                continue;
            }
            LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset");
            fromOffsets = consumer.endOffsets(topicPartitions);
            break;
        }
        return fromOffsets;
    }

    public static class CheckpointUtils {
        public static Map<TopicPartition, Long> strToOffsets(String checkpointStr) {
            HashMap<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
            String[] splits = checkpointStr.split(",");
            String topic = splits[0];
            for (int i = 1; i < splits.length; ++i) {
                String[] subSplits = splits[i].split(":");
                offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
            }
            return offsetMap;
        }

        public static String offsetsToStr(OffsetRange[] ranges) {
            ranges = CheckpointUtils.mergeRangesByTopicPartition(ranges);
            StringBuilder sb = new StringBuilder();
            sb.append(ranges[0].topic() + ",");
            sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, Map<TopicPartition, Long> toOffsetMap, long numEvents, long minPartitions) {
            OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
                long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
                return OffsetRange.create((TopicPartition)tp, (long)fromOffset, (long)((Long)toOffsetMap.get(tp)));
            }).sorted(SORT_BY_PARTITION).collect(Collectors.toList()).toArray(new OffsetRange[toOffsetMap.size()]);
            LOG.debug("numEvents {}, minPartitions {}, ranges {}", new Object[]{numEvents, minPartitions, ranges});
            boolean needSplitToMinPartitions = minPartitions > (long)toOffsetMap.size();
            long totalEvents = CheckpointUtils.totalNewMessages(ranges);
            long allocedEvents = 0L;
            HashSet<Integer> exhaustedPartitions = new HashSet<Integer>();
            ArrayList<OffsetRange> finalRanges = new ArrayList<OffsetRange>();
            long actualNumEvents = Math.min(totalEvents, numEvents);
            while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
                HashSet<Integer> allocatedPartitionsThisLoop = new HashSet<Integer>(exhaustedPartitions);
                for (int i = 0; i < ranges.length; ++i) {
                    long remainingEvents = actualNumEvents - allocedEvents;
                    long remainingPartitions = toOffsetMap.size() - allocatedPartitionsThisLoop.size();
                    if (needSplitToMinPartitions) {
                        remainingPartitions = minPartitions - (long)finalRanges.size();
                    }
                    long eventsPerPartition = (long)Math.ceil(1.0 * (double)remainingEvents / (double)remainingPartitions);
                    OffsetRange range = ranges[i];
                    if (exhaustedPartitions.contains(range.partition())) continue;
                    long toOffset = Math.min(range.untilOffset(), range.fromOffset() + eventsPerPartition);
                    if (toOffset == range.untilOffset()) {
                        exhaustedPartitions.add(range.partition());
                    }
                    if ((allocedEvents += toOffset - range.fromOffset()) > actualNumEvents) {
                        long offsetsToAdd = Math.min(eventsPerPartition, actualNumEvents - allocedEvents);
                        toOffset = Math.min(range.untilOffset(), toOffset + offsetsToAdd);
                    }
                    OffsetRange thisRange = OffsetRange.create((TopicPartition)range.topicPartition(), (long)range.fromOffset(), (long)toOffset);
                    finalRanges.add(thisRange);
                    ranges[i] = OffsetRange.create((TopicPartition)range.topicPartition(), (long)(range.fromOffset() + thisRange.count()), (long)range.untilOffset());
                    allocatedPartitionsThisLoop.add(range.partition());
                }
            }
            if (!needSplitToMinPartitions) {
                LOG.debug("final ranges merged by topic partition {}", (Object)Arrays.toString(CheckpointUtils.mergeRangesByTopicPartition(finalRanges.toArray(new OffsetRange[0]))));
                return CheckpointUtils.mergeRangesByTopicPartition(finalRanges.toArray(new OffsetRange[0]));
            }
            finalRanges.sort(SORT_BY_PARTITION);
            LOG.debug("final ranges {}", (Object)Arrays.toString(finalRanges.toArray(new OffsetRange[0])));
            return finalRanges.toArray(new OffsetRange[0]);
        }

        public static OffsetRange[] mergeRangesByTopicPartition(OffsetRange[] oldRanges) {
            ArrayList<OffsetRange> newRanges = new ArrayList<OffsetRange>();
            Map<TopicPartition, List<OffsetRange>> tpOffsets = Arrays.stream(oldRanges).collect(Collectors.groupingBy(OffsetRange::topicPartition));
            for (Map.Entry<TopicPartition, List<OffsetRange>> entry : tpOffsets.entrySet()) {
                long from = entry.getValue().stream().map(OffsetRange::fromOffset).min(Long::compare).get();
                long until = entry.getValue().stream().map(OffsetRange::untilOffset).max(Long::compare).get();
                newRanges.add(OffsetRange.create((TopicPartition)entry.getKey(), (long)from, (long)until));
            }
            newRanges.sort(SORT_BY_PARTITION);
            return newRanges.toArray(new OffsetRange[0]);
        }

        public static long totalNewMessages(OffsetRange[] ranges) {
            return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
        }
    }
}

