/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class KafkaOffsetGen {
    private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
    private final Pattern pattern = Pattern.compile(".*,.*:.*");
    private final Map<String, Object> kafkaParams;
    private final TypedProperties props;
    protected final String topicName;
    private KafkaResetOffsetStrategies autoResetValue;
    private final String kafkaCheckpointType;

    public KafkaOffsetGen(TypedProperties props) {
        this.props = props;
        this.kafkaParams = this.excludeHoodieConfigs(props);
        DataSourceUtils.checkRequiredProperties((TypedProperties)props, Collections.singletonList(Config.KAFKA_TOPIC_NAME.key()));
        this.topicName = props.getString(Config.KAFKA_TOPIC_NAME.key());
        this.kafkaCheckpointType = props.getString(Config.KAFKA_CHECKPOINT_TYPE.key(), (String)Config.KAFKA_CHECKPOINT_TYPE.defaultValue());
        String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET.key(), ((KafkaResetOffsetStrategies)((Object)Config.KAFKA_AUTO_OFFSET_RESET.defaultValue())).name().toLowerCase());
        boolean found = false;
        for (KafkaResetOffsetStrategies entry : KafkaResetOffsetStrategies.values()) {
            if (!entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) continue;
            found = true;
            this.autoResetValue = entry;
            break;
        }
        if (!found) {
            throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
        }
        if (this.autoResetValue.equals((Object)KafkaResetOffsetStrategies.GROUP)) {
            this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET.key(), ((KafkaResetOffsetStrategies)((Object)Config.KAFKA_AUTO_OFFSET_RESET.defaultValue())).name().toLowerCase());
        }
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
        long numEvents;
        Map toOffsets;
        Map<TopicPartition, Long> fromOffsets;
        try (KafkaConsumer consumer = new KafkaConsumer(this.kafkaParams);){
            if (!this.checkTopicExists(consumer)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            List partitionInfoList = consumer.partitionsFor(this.topicName);
            Set<TopicPartition> topicPartitions = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
            if ("timestamp".equals(this.kafkaCheckpointType) && this.isValidTimestampCheckpointType(lastCheckpointStr).booleanValue()) {
                lastCheckpointStr = this.getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, this.topicName, Long.parseLong((String)lastCheckpointStr.get()));
            }
            if (lastCheckpointStr.isPresent() && !((String)lastCheckpointStr.get()).isEmpty() && this.checkTopicCheckpoint(lastCheckpointStr)) {
                fromOffsets = this.fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
                metrics.updateDeltaStreamerKafkaDelayCountMetrics(this.delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
            } else {
                switch (this.autoResetValue) {
                    case EARLIEST: {
                        fromOffsets = consumer.beginningOffsets(topicPartitions);
                        break;
                    }
                    case LATEST: {
                        fromOffsets = consumer.endOffsets(topicPartitions);
                        break;
                    }
                    case GROUP: {
                        fromOffsets = this.getGroupOffsets(consumer, topicPartitions);
                        break;
                    }
                    default: {
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
                    }
                }
            }
            toOffsets = consumer.endOffsets(topicPartitions);
        }
        long maxEventsToReadFromKafka = this.props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.key(), ((Long)Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()).longValue());
        if (sourceLimit == Long.MAX_VALUE) {
            numEvents = maxEventsToReadFromKafka;
            LOG.info((Object)("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka));
        } else {
            numEvents = sourceLimit;
        }
        if (numEvents < (long)toOffsets.size()) {
            throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
        }
        return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
    }

    private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
        Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets((String)lastCheckpointStr.get());
        boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream().anyMatch(offset -> (Long)offset.getValue() < (Long)earliestOffsets.get(offset.getKey()));
        return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
    }

    private Boolean isValidTimestampCheckpointType(Option<String> lastCheckpointStr) {
        if (!lastCheckpointStr.isPresent()) {
            return false;
        }
        Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
        Matcher isNum = pattern.matcher((CharSequence)lastCheckpointStr.get());
        return isNum.matches() && (((String)lastCheckpointStr.get()).length() == 13 || ((String)lastCheckpointStr.get()).length() == 10);
    }

    private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
        Long delayCount = 0L;
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets((String)lastCheckpointStr.get());
        Map lastOffsets = consumer.endOffsets(topicPartitions);
        for (Map.Entry entry : lastOffsets.entrySet()) {
            Long offect = checkpointOffsets.getOrDefault(entry.getKey(), 0L);
            delayCount = delayCount + ((Long)entry.getValue() - offect > 0L ? (Long)entry.getValue() - offect : 0L);
        }
        return delayCount;
    }

    private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions, String topicName, Long timestamp) {
        Map topicPartitionsTimestamp = partitionInfoList.stream().map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toMap(Function.identity(), x -> timestamp));
        Map earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp);
        StringBuilder sb = new StringBuilder();
        sb.append(topicName + ",");
        for (Map.Entry map : offsetAndTimestamp.entrySet()) {
            if (map.getValue() != null) {
                sb.append(((TopicPartition)map.getKey()).partition()).append(":").append(((OffsetAndTimestamp)map.getValue()).offset()).append(",");
                continue;
            }
            sb.append(((TopicPartition)map.getKey()).partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
        }
        return Option.of((Object)sb.deleteCharAt(sb.length() - 1).toString());
    }

    public boolean checkTopicExists(KafkaConsumer consumer) {
        Map result = consumer.listTopics();
        return result.containsKey(this.topicName);
    }

    private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
        Matcher matcher = this.pattern.matcher((CharSequence)lastCheckpointStr.get());
        return matcher.matches();
    }

    public String getTopicName() {
        return this.topicName;
    }

    public Map<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }

    private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        props.keySet().stream().filter(prop -> !prop.toString().startsWith("hoodie.") || prop.toString().startsWith("hoodie.deltastreamer.source.kafka.value.deserializer.")).forEach(prop -> kafkaParams.put(prop.toString(), props.get((Object)prop.toString())));
        return kafkaParams;
    }

    public void commitOffsetToKafka(String checkpointStr) {
        DataSourceUtils.checkRequiredProperties((TypedProperties)this.props, Collections.singletonList("group.id"));
        Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
        HashMap offsetAndMetadataMap = new HashMap(offsetMap.size());
        try (KafkaConsumer consumer = new KafkaConsumer(this.kafkaParams);){
            offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset.longValue())));
            consumer.commitSync(offsetAndMetadataMap);
        }
        catch (CommitFailedException | TimeoutException e) {
            LOG.warn((Object)"Committing offsets to Kafka failed, this does not impact processing of records", e);
        }
    }

    private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, Set<TopicPartition> topicPartitions) {
        Map<Object, Long> fromOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : topicPartitions) {
            OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition);
            if (committedOffsetAndMetadata != null) {
                fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
                continue;
            }
            LOG.warn((Object)"There are no commits associated with this consumer group, starting to consume from latest offset");
            fromOffsets = consumer.endOffsets(topicPartitions);
            break;
        }
        return fromOffsets;
    }

    public static class Config {
        private static final ConfigProperty<String> KAFKA_TOPIC_NAME = ConfigProperty.key((String)"hoodie.deltastreamer.source.kafka.topic").noDefaultValue().withDocumentation("Kafka topic name.");
        public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty.key((String)"hoodie.deltastreamer.source.kafka.checkpoint.type").defaultValue((Object)"string").withDocumentation("Kafka chepoint type.");
        public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty.key((String)"hoodie.deltastreamer.source.kafka.enable.commit.offset").defaultValue((Object)false).withDocumentation("Automatically submits offset to kafka.");
        public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty.key((String)"hoodie.deltastreamer.kafka.source.maxEvents").defaultValue((Object)5000000L).withDocumentation("Maximum number of records obtained in each batch.");
        private static final ConfigProperty<KafkaResetOffsetStrategies> KAFKA_AUTO_OFFSET_RESET = ConfigProperty.key((String)"auto.offset.reset").defaultValue((Object)KafkaResetOffsetStrategies.LATEST).withDocumentation("Kafka consumer strategy for reading data.");
        public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
    }

    static enum KafkaResetOffsetStrategies {
        LATEST,
        EARLIEST,
        GROUP;

    }

    public static class CheckpointUtils {
        public static Map<TopicPartition, Long> strToOffsets(String checkpointStr) {
            HashMap<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
            String[] splits = checkpointStr.split(",");
            String topic = splits[0];
            for (int i = 1; i < splits.length; ++i) {
                String[] subSplits = splits[i].split(":");
                offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
            }
            return offsetMap;
        }

        public static String offsetsToStr(OffsetRange[] ranges) {
            StringBuilder sb = new StringBuilder();
            sb.append(ranges[0].topic() + ",");
            sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, Map<TopicPartition, Long> toOffsetMap, long numEvents) {
            Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
            OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
            toOffsetMap.keySet().stream().map(tp -> {
                long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
                return OffsetRange.create((TopicPartition)tp, (long)fromOffset, (long)fromOffset);
            }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
            long allocedEvents = 0L;
            HashSet<Integer> exhaustedPartitions = new HashSet<Integer>();
            while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
                long remainingEvents = numEvents - allocedEvents;
                long eventsPerPartition = (long)Math.ceil(1.0 * (double)remainingEvents / (double)(toOffsetMap.size() - exhaustedPartitions.size()));
                for (int i = 0; i < ranges.length; ++i) {
                    OffsetRange range = ranges[i];
                    if (exhaustedPartitions.contains(range.partition())) continue;
                    long toOffsetMax = toOffsetMap.get(range.topicPartition());
                    long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
                    if (toOffset == toOffsetMax) {
                        exhaustedPartitions.add(range.partition());
                    }
                    if ((allocedEvents += toOffset - range.untilOffset()) > numEvents) {
                        long offsetsToAdd = Math.min(eventsPerPartition, numEvents - allocedEvents);
                        toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
                    }
                    ranges[i] = OffsetRange.create((TopicPartition)range.topicPartition(), (long)range.fromOffset(), (long)toOffset);
                }
            }
            return ranges;
        }

        public static long totalNewMessages(OffsetRange[] ranges) {
            return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
        }
    }
}

