package org.apache.hudi.utilities.sources.helpers;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.class */
public class KafkaOffsetGen {
    private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
    private final HashMap<String, Object> kafkaParams = new HashMap<>();
    private final TypedProperties props;
    protected final String topicName;

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen$CheckpointUtils.class */
    public static class CheckpointUtils {
        public static HashMap<TopicPartition, Long> strToOffsets(String str) {
            HashMap<TopicPartition, Long> hashMap = new HashMap<>();
            String[] split = str.split(",");
            String str2 = split[0];
            for (int i = 1; i < split.length; i++) {
                String[] split2 = split[i].split(":");
                hashMap.put(new TopicPartition(str2, Integer.parseInt(split2[0])), Long.valueOf(Long.parseLong(split2[1])));
            }
            return hashMap;
        }

        public static String offsetsToStr(OffsetRange[] offsetRangeArr) {
            StringBuilder sb = new StringBuilder();
            sb.append(offsetRangeArr[0].topic() + ",");
            sb.append((String) Arrays.stream(offsetRangeArr).map(offsetRange -> {
                return String.format("%s:%d", Integer.valueOf(offsetRange.partition()), Long.valueOf(offsetRange.untilOffset()));
            }).collect(Collectors.joining(",")));
            return sb.toString();
        }

        public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2, long j) {
            Comparator comparing = Comparator.comparing((v0) -> {
                return v0.partition();
            });
            OffsetRange[] offsetRangeArr = new OffsetRange[map2.size()];
            ((List) map2.keySet().stream().map(topicPartition -> {
                long longValue = ((Long) map.getOrDefault(topicPartition, 0L)).longValue();
                return OffsetRange.create(topicPartition, longValue, longValue);
            }).sorted(comparing).collect(Collectors.toList())).toArray(offsetRangeArr);
            long j2 = 0;
            HashSet hashSet = new HashSet();
            while (j2 < j && hashSet.size() < map2.size()) {
                long ceil = (long) Math.ceil((1.0d * (j - j2)) / (map2.size() - hashSet.size()));
                for (int i = 0; i < offsetRangeArr.length; i++) {
                    OffsetRange offsetRange = offsetRangeArr[i];
                    if (!hashSet.contains(Integer.valueOf(offsetRange.partition()))) {
                        long longValue = map2.get(offsetRange.topicPartition()).longValue();
                        long min = Math.min(longValue, offsetRange.untilOffset() + ceil);
                        if (min == longValue) {
                            hashSet.add(Integer.valueOf(offsetRange.partition()));
                        }
                        j2 += min - offsetRange.untilOffset();
                        if (j2 > j) {
                            min = Math.min(longValue, min + Math.min(ceil, j - j2));
                        }
                        offsetRangeArr[i] = OffsetRange.create(offsetRange.topicPartition(), offsetRange.fromOffset(), min);
                    }
                }
            }
            return offsetRangeArr;
        }

        public static long totalNewMessages(OffsetRange[] offsetRangeArr) {
            return Arrays.stream(offsetRangeArr).mapToLong((v0) -> {
                return v0.count();
            }).sum();
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen$Config.class */
    public static class Config {
        private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
        private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
        private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
        public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
        public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
    }

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen$KafkaResetOffsetStrategies.class */
    enum KafkaResetOffsetStrategies {
        LATEST,
        EARLIEST
    }

    public KafkaOffsetGen(TypedProperties typedProperties) {
        this.props = typedProperties;
        for (Object obj : typedProperties.keySet()) {
            this.kafkaParams.put(obj.toString(), typedProperties.get(obj.toString()));
        }
        DataSourceUtils.checkRequiredProperties(typedProperties, Collections.singletonList(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
        this.topicName = typedProperties.getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP);
    }

    public OffsetRange[] getNextOffsetRanges(Option<String> option, long j, HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics) {
        Map<TopicPartition, Long> endOffsets;
        long j2;
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.kafkaParams);
        Throwable th = null;
        try {
            if (!checkTopicExists(kafkaConsumer)) {
                throw new HoodieException("Kafka topic:" + this.topicName + " does not exist");
            }
            Set<TopicPartition> set = (Set) kafkaConsumer.partitionsFor(this.topicName).stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toSet());
            if (!option.isPresent() || ((String) option.get()).isEmpty()) {
                switch (KafkaResetOffsetStrategies.valueOf(this.props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase())) {
                    case EARLIEST:
                        endOffsets = kafkaConsumer.beginningOffsets(set);
                        break;
                    case LATEST:
                        endOffsets = kafkaConsumer.endOffsets(set);
                        break;
                    default:
                        throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
                }
            } else {
                endOffsets = checkupValidOffsets(kafkaConsumer, option, set);
                hoodieDeltaStreamerMetrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(option, set, kafkaConsumer).longValue());
            }
            Map endOffsets2 = kafkaConsumer.endOffsets(set);
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            long j3 = this.props.getLong("hoodie.deltastreamer.kafka.source.maxEvents", Config.maxEventsFromKafkaSource);
            if (j == Long.MAX_VALUE) {
                j2 = j3;
                LOG.info("SourceLimit not configured, set numEvents to default value : " + j3);
            } else {
                j2 = j;
            }
            if (j2 < endOffsets2.size()) {
                throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
            }
            return CheckpointUtils.computeOffsetRanges(endOffsets, endOffsets2, j2);
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer kafkaConsumer, Option<String> option, Set<TopicPartition> set) {
        HashMap<TopicPartition, Long> strToOffsets = CheckpointUtils.strToOffsets((String) option.get());
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(set);
        return strToOffsets.entrySet().stream().anyMatch(entry -> {
            return ((Long) entry.getValue()).longValue() < ((Long) beginningOffsets.get(entry.getKey())).longValue();
        }) ? beginningOffsets : strToOffsets;
    }

    private Long delayOffsetCalculation(Option<String> option, Set<TopicPartition> set, KafkaConsumer kafkaConsumer) {
        Long l = 0L;
        HashMap<TopicPartition, Long> strToOffsets = CheckpointUtils.strToOffsets((String) option.get());
        for (Map.Entry entry : kafkaConsumer.endOffsets(set).entrySet()) {
            Long orDefault = strToOffsets.getOrDefault(entry.getKey(), 0L);
            l = Long.valueOf(l.longValue() + (((Long) entry.getValue()).longValue() - orDefault.longValue() > 0 ? ((Long) entry.getValue()).longValue() - orDefault.longValue() : 0L));
        }
        return l;
    }

    public boolean checkTopicExists(KafkaConsumer kafkaConsumer) {
        return kafkaConsumer.listTopics().containsKey(this.topicName);
    }

    public String getTopicName() {
        return this.topicName;
    }

    public HashMap<String, Object> getKafkaParams() {
        return this.kafkaParams;
    }
}
