/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.enumerator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KafkaSourceEnumerator
implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceEnumerator.class);
    private final KafkaSubscriber subscriber;
    private final OffsetsInitializer startingOffsetInitializer;
    private final OffsetsInitializer stoppingOffsetInitializer;
    private final Properties properties;
    private final long partitionDiscoveryIntervalMs;
    private final SplitEnumeratorContext<KafkaPartitionSplit> context;
    private final Boundedness boundedness;
    private final Set<TopicPartition> assignedPartitions;
    private final Map<Integer, Set<KafkaPartitionSplit>> pendingPartitionSplitAssignment;
    private final String consumerGroupId;
    private KafkaConsumer<byte[], byte[]> consumer;
    private AdminClient adminClient;
    private boolean noMoreNewPartitionSplits = false;

    public KafkaSourceEnumerator(KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness) {
        this(subscriber, startingOffsetInitializer, stoppingOffsetInitializer, properties, context, boundedness, Collections.emptySet());
    }

    public KafkaSourceEnumerator(KafkaSubscriber subscriber, OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, Boundedness boundedness, Set<TopicPartition> assignedPartitions) {
        this.subscriber = subscriber;
        this.startingOffsetInitializer = startingOffsetInitializer;
        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
        this.properties = properties;
        this.context = context;
        this.boundedness = boundedness;
        this.assignedPartitions = new HashSet<TopicPartition>(assignedPartitions);
        this.pendingPartitionSplitAssignment = new HashMap<Integer, Set<KafkaPartitionSplit>>();
        this.partitionDiscoveryIntervalMs = KafkaSourceOptions.getOption(properties, KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS, Long::parseLong);
        this.consumerGroupId = properties.getProperty("group.id");
    }

    public void start() {
        this.consumer = this.getKafkaConsumer();
        this.adminClient = this.getKafkaAdminClient();
        if (this.partitionDiscoveryIntervalMs > 0L) {
            LOG.info("Starting the KafkaSourceEnumerator for consumer group {} with partition discovery interval of {} ms.", (Object)this.consumerGroupId, (Object)this.partitionDiscoveryIntervalMs);
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0L, this.partitionDiscoveryIntervalMs);
        } else {
            LOG.info("Starting the KafkaSourceEnumerator for consumer group {} without periodic partition discovery.", (Object)this.consumerGroupId);
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    }

    public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) {
        this.addPartitionSplitChangeToPendingAssignments(splits);
        if (this.context.registeredReaders().containsKey(subtaskId)) {
            this.assignPendingPartitionSplits(Collections.singleton(subtaskId));
        }
    }

    public void addReader(int subtaskId) {
        LOG.debug("Adding reader {} to KafkaSourceEnumerator for consumer group {}.", (Object)subtaskId, (Object)this.consumerGroupId);
        this.assignPendingPartitionSplits(Collections.singleton(subtaskId));
    }

    public KafkaSourceEnumState snapshotState(long checkpointId) throws Exception {
        return new KafkaSourceEnumState(this.assignedPartitions);
    }

    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    private Set<TopicPartition> getSubscribedTopicPartitions() {
        return this.subscriber.getSubscribedTopicPartitions(this.adminClient);
    }

    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable t) {
        if (t != null) {
            throw new FlinkRuntimeException("Failed to list subscribed topic partitions due to ", t);
        }
        PartitionChange partitionChange = this.getPartitionChange(fetchedPartitions);
        if (partitionChange.isEmpty()) {
            return;
        }
        this.context.callAsync(() -> this.initializePartitionSplits(partitionChange), this::handlePartitionSplitChanges);
    }

    private PartitionSplitChange initializePartitionSplits(PartitionChange partitionChange) {
        Set<TopicPartition> newPartitions = Collections.unmodifiableSet(partitionChange.getNewPartitions());
        OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = this.getOffsetsRetriever();
        Map<TopicPartition, Long> startingOffsets = this.startingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);
        Map<TopicPartition, Long> stoppingOffsets = this.stoppingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);
        HashSet<KafkaPartitionSplit> partitionSplits = new HashSet<KafkaPartitionSplit>(newPartitions.size());
        for (TopicPartition tp : newPartitions) {
            Long startingOffset = startingOffsets.get(tp);
            long stoppingOffset = stoppingOffsets.getOrDefault(tp, Long.MIN_VALUE);
            partitionSplits.add(new KafkaPartitionSplit(tp, startingOffset, stoppingOffset));
        }
        return new PartitionSplitChange(partitionSplits, partitionChange.getRemovedPartitions());
    }

    private void handlePartitionSplitChanges(PartitionSplitChange partitionSplitChange, Throwable t) {
        if (t != null) {
            throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t);
        }
        if (this.partitionDiscoveryIntervalMs < 0L) {
            LOG.debug("Partition discovery is disabled.");
            this.noMoreNewPartitionSplits = true;
        }
        this.addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
        this.assignPendingPartitionSplits(this.context.registeredReaders().keySet());
    }

    private void addPartitionSplitChangeToPendingAssignments(Collection<KafkaPartitionSplit> newPartitionSplits) {
        int numReaders = this.context.currentParallelism();
        for (KafkaPartitionSplit split : newPartitionSplits) {
            int ownerReader = KafkaSourceEnumerator.getSplitOwner(split.getTopicPartition(), numReaders);
            this.pendingPartitionSplitAssignment.computeIfAbsent(ownerReader, r -> new HashSet()).add(split);
        }
        LOG.debug("Assigned {} to {} readers of consumer group {}.", new Object[]{newPartitionSplits, numReaders, this.consumerGroupId});
    }

    private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
        HashMap<Integer, List> incrementalAssignment = new HashMap<Integer, List>();
        for (int pendingReader : pendingReaders) {
            this.checkReaderRegistered(pendingReader);
            Set<KafkaPartitionSplit> pendingAssignmentForReader = this.pendingPartitionSplitAssignment.remove(pendingReader);
            if (pendingAssignmentForReader == null || pendingAssignmentForReader.isEmpty()) continue;
            incrementalAssignment.computeIfAbsent(pendingReader, ignored -> new ArrayList()).addAll(pendingAssignmentForReader);
            pendingAssignmentForReader.forEach(split -> this.assignedPartitions.add(split.getTopicPartition()));
        }
        if (!incrementalAssignment.isEmpty()) {
            LOG.info("Assigning splits to readers {}", incrementalAssignment);
            this.context.assignSplits(new SplitsAssignment(incrementalAssignment));
        }
        if (this.noMoreNewPartitionSplits && this.boundedness == Boundedness.BOUNDED) {
            LOG.debug("No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in consumer group {}.", pendingReaders, (Object)this.consumerGroupId);
            pendingReaders.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
        }
    }

    private void checkReaderRegistered(int readerId) {
        if (!this.context.registeredReaders().containsKey(readerId)) {
            throw new IllegalStateException(String.format("Reader %d is not registered to source coordinator", readerId));
        }
    }

    @VisibleForTesting
    PartitionChange getPartitionChange(Set<TopicPartition> fetchedPartitions) {
        HashSet<TopicPartition> removedPartitions = new HashSet<TopicPartition>();
        Consumer<TopicPartition> dedupOrMarkAsRemoved = tp -> {
            if (!fetchedPartitions.remove(tp)) {
                removedPartitions.add((TopicPartition)tp);
            }
        };
        this.assignedPartitions.forEach(dedupOrMarkAsRemoved);
        this.pendingPartitionSplitAssignment.forEach((reader, splits) -> splits.forEach(split -> dedupOrMarkAsRemoved.accept(split.getTopicPartition())));
        if (!fetchedPartitions.isEmpty()) {
            LOG.info("Discovered new partitions: {}", fetchedPartitions);
        }
        if (!removedPartitions.isEmpty()) {
            LOG.info("Discovered removed partitions: {}", removedPartitions);
        }
        return new PartitionChange(fetchedPartitions, removedPartitions);
    }

    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties consumerProps = new Properties();
        KafkaSourceEnumerator.deepCopyProperties(this.properties, consumerProps);
        String clientIdPrefix = consumerProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
        consumerProps.setProperty("client.id", clientIdPrefix + "-enumerator-consumer");
        consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.setProperty("allow.auto.create.topics", "false");
        return new KafkaConsumer(consumerProps);
    }

    private AdminClient getKafkaAdminClient() {
        Properties adminClientProps = new Properties();
        KafkaSourceEnumerator.deepCopyProperties(this.properties, adminClientProps);
        String clientIdPrefix = adminClientProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
        adminClientProps.setProperty("client.id", clientIdPrefix + "-enumerator-admin-client");
        return AdminClient.create((Properties)adminClientProps);
    }

    private OffsetsInitializer.PartitionOffsetsRetriever getOffsetsRetriever() {
        String groupId = this.properties.getProperty("group.id");
        return new PartitionOffsetsRetrieverImpl(this.consumer, this.adminClient, groupId);
    }

    @VisibleForTesting
    static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = (tp.topic().hashCode() * 31 & Integer.MAX_VALUE) % numReaders;
        return (startIndex + tp.partition()) % numReaders;
    }

    @VisibleForTesting
    static void deepCopyProperties(Properties from, Properties to) {
        for (String key : from.stringPropertyNames()) {
            to.setProperty(key, from.getProperty(key));
        }
    }

    @VisibleForTesting
    public static class PartitionOffsetsRetrieverImpl
    implements OffsetsInitializer.PartitionOffsetsRetriever,
    AutoCloseable {
        private final KafkaConsumer<?, ?> consumer;
        private final AdminClient adminClient;
        private final String groupId;

        public PartitionOffsetsRetrieverImpl(KafkaConsumer<?, ?> consumer, AdminClient adminClient, String groupId) {
            this.consumer = consumer;
            this.adminClient = adminClient;
            this.groupId = groupId;
        }

        @Override
        public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
            ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<TopicPartition>(partitions));
            try {
                return (Map)this.adminClient.listConsumerGroupOffsets(this.groupId, options).partitionsToOffsetAndMetadata().thenApply(result -> {
                    HashMap offsets = new HashMap();
                    result.forEach((tp, oam) -> {
                        if (oam != null) {
                            offsets.put(tp, oam.offset());
                        }
                    });
                    return offsets;
                }).get();
            }
            catch (InterruptedException e) {
                throw new FlinkRuntimeException("Interrupted while listing offsets for consumer group " + this.groupId, (Throwable)e);
            }
            catch (ExecutionException e) {
                throw new FlinkRuntimeException("Failed to fetch committed offsets for consumer group " + this.groupId + " due to", (Throwable)e);
            }
        }

        @Override
        public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
            return this.consumer.endOffsets(partitions);
        }

        @Override
        public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
            return this.consumer.beginningOffsets(partitions);
        }

        @Override
        public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
            return this.consumer.offsetsForTimes(timestampsToSearch);
        }

        @Override
        public void close() throws Exception {
            this.consumer.close(Duration.ZERO);
            this.adminClient.close(Duration.ZERO);
        }
    }

    private static class PartitionSplitChange {
        private final Set<KafkaPartitionSplit> newPartitionSplits;
        private final Set<TopicPartition> removedPartitions;

        private PartitionSplitChange(Set<KafkaPartitionSplit> newPartitionSplits, Set<TopicPartition> removedPartitions) {
            this.newPartitionSplits = Collections.unmodifiableSet(newPartitionSplits);
            this.removedPartitions = Collections.unmodifiableSet(removedPartitions);
        }
    }

    @VisibleForTesting
    static class PartitionChange {
        private final Set<TopicPartition> newPartitions;
        private final Set<TopicPartition> removedPartitions;

        PartitionChange(Set<TopicPartition> newPartitions, Set<TopicPartition> removedPartitions) {
            this.newPartitions = newPartitions;
            this.removedPartitions = removedPartitions;
        }

        public Set<TopicPartition> getNewPartitions() {
            return this.newPartitions;
        }

        public Set<TopicPartition> getRemovedPartitions() {
            return this.removedPartitions;
        }

        public boolean isEmpty() {
            return this.newPartitions.isEmpty() && this.removedPartitions.isEmpty();
        }
    }
}

