/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public final class ConsumerCoordinator
extends AbstractCoordinator {
    private final Logger log;
    private final List<PartitionAssignor> assignors;
    private final Metadata metadata;
    private final ConsumerCoordinatorMetrics sensors;
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    private final boolean autoCommitEnabled;
    private final int autoCommitIntervalMs;
    private final ConsumerInterceptors<?, ?> interceptors;
    private final boolean excludeInternalTopics;
    private final AtomicInteger pendingAsyncCommits;
    private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
    private boolean isLeader = false;
    private Set<String> joinedSubscription;
    private MetadataSnapshot metadataSnapshot;
    private MetadataSnapshot assignmentSnapshot;
    private long nextAutoCommitDeadline;

    public ConsumerCoordinator(LogContext logContext, ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs2, int sessionTimeoutMs, int heartbeatIntervalMs, List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics, boolean leaveGroupOnClose) {
        super(logContext, client, groupId, rebalanceTimeoutMs2, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
        this.log = logContext.logger(ConsumerCoordinator.class);
        this.metadata = metadata;
        this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
        this.subscriptions = subscriptions;
        this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
        this.autoCommitEnabled = autoCommitEnabled;
        this.autoCommitIntervalMs = autoCommitIntervalMs;
        this.assignors = assignors;
        this.completedOffsetCommits = new ConcurrentLinkedQueue();
        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
        this.interceptors = interceptors;
        this.excludeInternalTopics = excludeInternalTopics;
        this.pendingAsyncCommits = new AtomicInteger();
        if (autoCommitEnabled) {
            this.nextAutoCommitDeadline = time.milliseconds() + (long)autoCommitIntervalMs;
        }
        this.metadata.requestUpdate();
        this.addMetadataListener();
    }

    @Override
    public String protocolType() {
        return "consumer";
    }

    @Override
    public List<JoinGroupRequest.ProtocolMetadata> metadata() {
        this.joinedSubscription = this.subscriptions.subscription();
        ArrayList<JoinGroupRequest.ProtocolMetadata> metadataList = new ArrayList<JoinGroupRequest.ProtocolMetadata>();
        for (PartitionAssignor assignor : this.assignors) {
            PartitionAssignor.Subscription subscription = assignor.subscription(this.joinedSubscription);
            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
            metadataList.add(new JoinGroupRequest.ProtocolMetadata(assignor.name(), metadata));
        }
        return metadataList;
    }

    public void updatePatternSubscription(Cluster cluster) {
        HashSet<String> topicsToSubscribe = new HashSet<String>();
        for (String topic : cluster.topics()) {
            if (!this.subscriptions.subscribedPattern().matcher(topic).matches() || this.excludeInternalTopics && cluster.internalTopics().contains(topic)) continue;
            topicsToSubscribe.add(topic);
        }
        this.subscriptions.subscribeFromPattern(topicsToSubscribe);
        this.metadata.setTopics(this.subscriptions.groupSubscription());
    }

    private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener(){

            @Override
            public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
                MetadataSnapshot snapshot;
                if (!cluster.unauthorizedTopics().isEmpty()) {
                    throw new TopicAuthorizationException(new HashSet<String>(cluster.unauthorizedTopics()));
                }
                if (ConsumerCoordinator.this.subscriptions.hasPatternSubscription()) {
                    ConsumerCoordinator.this.updatePatternSubscription(cluster);
                }
                if (ConsumerCoordinator.this.subscriptions.partitionsAutoAssigned() && !(snapshot = new MetadataSnapshot(ConsumerCoordinator.this.subscriptions, cluster)).equals(ConsumerCoordinator.this.metadataSnapshot)) {
                    ConsumerCoordinator.this.metadataSnapshot = snapshot;
                }
                if (!Collections.disjoint(ConsumerCoordinator.this.metadata.topics(), unavailableTopics)) {
                    ConsumerCoordinator.this.metadata.requestUpdate();
                }
            }
        });
    }

    private PartitionAssignor lookupAssignor(String name) {
        for (PartitionAssignor assignor : this.assignors) {
            if (!assignor.name().equals(name)) continue;
            return assignor;
        }
        return null;
    }

    @Override
    protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
        PartitionAssignor assignor;
        if (!this.isLeader) {
            this.assignmentSnapshot = null;
        }
        if ((assignor = this.lookupAssignor(assignmentStrategy)) == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
        this.subscriptions.assignFromSubscribed(assignment.partitions());
        HashSet<String> addedTopics = new HashSet<String>();
        for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
            if (this.joinedSubscription.contains(tp.topic())) continue;
            addedTopics.add(tp.topic());
        }
        if (!addedTopics.isEmpty()) {
            HashSet<String> newSubscription = new HashSet<String>(this.subscriptions.subscription());
            HashSet<String> newJoinedSubscription = new HashSet<String>(this.joinedSubscription);
            newSubscription.addAll(addedTopics);
            newJoinedSubscription.addAll(addedTopics);
            this.subscriptions.subscribeFromPattern(newSubscription);
            this.joinedSubscription = newJoinedSubscription;
        }
        this.metadata.setTopics(this.subscriptions.groupSubscription());
        this.client.ensureFreshMetadata();
        assignor.onAssignment(assignment);
        this.nextAutoCommitDeadline = this.time.milliseconds() + (long)this.autoCommitIntervalMs;
        ConsumerRebalanceListener listener = this.subscriptions.rebalanceListener();
        this.log.info("Setting newly assigned partitions {}", (Object)this.subscriptions.assignedPartitions());
        try {
            HashSet<TopicPartition> assigned = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            listener.onPartitionsAssigned(assigned);
        }
        catch (InterruptException | WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            this.log.error("User provided listener {} failed on partition assignment", (Object)listener.getClass().getName(), (Object)e);
        }
    }

    public void poll(long now, long remainingMs) {
        this.invokeCompletedOffsetCommitCallbacks();
        if (this.subscriptions.partitionsAutoAssigned()) {
            if (this.coordinatorUnknown()) {
                this.ensureCoordinatorReady();
                now = this.time.milliseconds();
            }
            if (this.needRejoin()) {
                if (this.subscriptions.hasPatternSubscription()) {
                    this.client.ensureFreshMetadata();
                }
                this.ensureActiveGroup();
                now = this.time.milliseconds();
            }
            this.pollHeartbeat(now);
        } else if (this.metadata.updateRequested() && !this.client.hasReadyNodes()) {
            boolean metadataUpdated = this.client.awaitMetadataUpdate(remainingMs);
            if (!metadataUpdated && !this.client.hasReadyNodes()) {
                return;
            }
            now = this.time.milliseconds();
        }
        this.maybeAutoCommitOffsetsAsync(now);
    }

    public long timeToNextPoll(long now) {
        if (!this.autoCommitEnabled) {
            return this.timeToNextHeartbeat(now);
        }
        if (now > this.nextAutoCommitDeadline) {
            return 0L;
        }
        return Math.min(this.nextAutoCommitDeadline - now, this.timeToNextHeartbeat(now));
    }

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) {
        PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
        if (assignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        HashSet<String> allSubscribedTopics = new HashSet<String>();
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        this.metadata.setTopics(this.subscriptions.groupSubscription());
        this.client.ensureFreshMetadata();
        this.isLeader = true;
        this.log.debug("Performing assignment using strategy {} with subscriptions {}", (Object)assignor.name(), (Object)subscriptions);
        Map<String, PartitionAssignor.Assignment> assignment = assignor.assign(this.metadata.fetch(), subscriptions);
        HashSet<String> assignedTopics = new HashSet<String>();
        for (PartitionAssignor.Assignment assigned : assignment.values()) {
            for (TopicPartition tp : assigned.partitions()) {
                assignedTopics.add(tp.topic());
            }
        }
        if (!assignedTopics.containsAll(allSubscribedTopics)) {
            HashSet<String> notAssignedTopics = new HashSet<String>(allSubscribedTopics);
            notAssignedTopics.removeAll(assignedTopics);
            this.log.warn("The following subscribed topics are not assigned to any members: {} ", (Object)notAssignedTopics);
        }
        if (!allSubscribedTopics.containsAll(assignedTopics)) {
            HashSet newlyAddedTopics = new HashSet(assignedTopics);
            newlyAddedTopics.removeAll(allSubscribedTopics);
            this.log.info("The following not-subscribed topics are assigned, and their metadata will be fetched from the brokers: {}", (Object)newlyAddedTopics);
            allSubscribedTopics.addAll(assignedTopics);
            this.subscriptions.groupSubscribe(allSubscribedTopics);
            this.metadata.setTopics(this.subscriptions.groupSubscription());
            this.client.ensureFreshMetadata();
        }
        this.assignmentSnapshot = this.metadataSnapshot;
        this.log.debug("Finished assignment for group: {}", (Object)assignment);
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (Map.Entry<String, PartitionAssignor.Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }
        return groupAssignment;
    }

    @Override
    protected void onJoinPrepare(int generation, String memberId) {
        this.maybeAutoCommitOffsetsSync(this.rebalanceTimeoutMs);
        ConsumerRebalanceListener listener = this.subscriptions.rebalanceListener();
        this.log.info("Revoking previously assigned partitions {}", (Object)this.subscriptions.assignedPartitions());
        try {
            HashSet<TopicPartition> revoked = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            listener.onPartitionsRevoked(revoked);
        }
        catch (InterruptException | WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            this.log.error("User provided listener {} failed on partition revocation", (Object)listener.getClass().getName(), (Object)e);
        }
        this.isLeader = false;
        this.subscriptions.resetGroupSubscription();
    }

    @Override
    public boolean needRejoin() {
        if (!this.subscriptions.partitionsAutoAssigned()) {
            return false;
        }
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.equals(this.metadataSnapshot)) {
            return true;
        }
        if (this.joinedSubscription != null && !this.joinedSubscription.equals(this.subscriptions.subscription())) {
            return true;
        }
        return super.needRejoin();
    }

    public void refreshCommittedOffsetsIfNeeded() {
        Set<TopicPartition> missingFetchPositions = this.subscriptions.missingFetchPositions();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.fetchCommittedOffsets(missingFetchPositions);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : offsets.entrySet()) {
            TopicPartition tp = entry2.getKey();
            long offset2 = entry2.getValue().offset();
            this.log.debug("Setting offset for partition {} to the committed offset {}", (Object)tp, (Object)offset2);
            this.subscriptions.seek(tp, offset2);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions2) {
        if (partitions2.isEmpty()) {
            return Collections.emptyMap();
        }
        while (true) {
            this.ensureCoordinatorReady();
            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = this.sendOffsetFetchRequest(partitions2);
            this.client.poll(future);
            if (future.succeeded()) {
                return future.value();
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(long timeoutMs) {
        this.client.disableWakeups();
        long now = this.time.milliseconds();
        long endTimeMs = now + timeoutMs;
        try {
            this.maybeAutoCommitOffsetsSync(timeoutMs);
            now = this.time.milliseconds();
            if (this.pendingAsyncCommits.get() > 0 && endTimeMs > now) {
                this.ensureCoordinatorReady(now, endTimeMs - now);
                now = this.time.milliseconds();
            }
        }
        finally {
            super.close(Math.max(0L, endTimeMs - now));
        }
    }

    void invokeCompletedOffsetCommitCallbacks() {
        OffsetCommitCompletion completion;
        while ((completion = this.completedOffsetCommits.poll()) != null) {
            completion.invoke();
        }
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        this.invokeCompletedOffsetCommitCallbacks();
        if (!this.coordinatorUnknown()) {
            this.doCommitOffsetsAsync(offsets, callback);
        } else {
            this.pendingAsyncCommits.incrementAndGet();
            this.lookupCoordinator().addListener(new RequestFutureListener<Void>(){

                @Override
                public void onSuccess(Void value2) {
                    ConsumerCoordinator.this.pendingAsyncCommits.decrementAndGet();
                    ConsumerCoordinator.this.doCommitOffsetsAsync(offsets, callback);
                    ConsumerCoordinator.this.client.pollNoWakeup();
                }

                @Override
                public void onFailure(RuntimeException e) {
                    ConsumerCoordinator.this.pendingAsyncCommits.decrementAndGet();
                    ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
                }
            });
        }
        this.client.pollNoWakeup();
    }

    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? this.defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>(){

            @Override
            public void onSuccess(Void value2) {
                if (ConsumerCoordinator.this.interceptors != null) {
                    ConsumerCoordinator.this.interceptors.onCommit(offsets);
                }
                ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
            }

            @Override
            public void onFailure(RuntimeException e) {
                RuntimeException commitException = e;
                if (e instanceof RetriableException) {
                    commitException = new RetriableCommitFailedException(e);
                }
                ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
            }
        });
    }

    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
        long now;
        this.invokeCompletedOffsetCommitCallbacks();
        if (offsets.isEmpty()) {
            return true;
        }
        long startMs = now = this.time.milliseconds();
        long remainingMs = timeoutMs;
        do {
            if (this.coordinatorUnknown()) {
                if (!this.ensureCoordinatorReady(now, remainingMs)) {
                    return false;
                }
                remainingMs = timeoutMs - (this.time.milliseconds() - startMs);
            }
            RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
            this.client.poll(future, remainingMs);
            this.invokeCompletedOffsetCommitCallbacks();
            if (future.succeeded()) {
                if (this.interceptors != null) {
                    this.interceptors.onCommit(offsets);
                }
                return true;
            }
            if (future.failed() && !future.isRetriable()) {
                throw future.exception();
            }
            this.time.sleep(this.retryBackoffMs);
        } while ((remainingMs = timeoutMs - ((now = this.time.milliseconds()) - startMs)) > 0L);
        return false;
    }

    public void maybeAutoCommitOffsetsAsync(long now) {
        if (this.autoCommitEnabled && now >= this.nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + (long)this.autoCommitIntervalMs;
            this.doAutoCommitOffsetsAsync();
        }
    }

    private void doAutoCommitOffsetsAsync() {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = this.subscriptions.allConsumed();
        this.log.debug("Sending asynchronous auto-commit of offsets {}", (Object)allConsumedOffsets);
        this.commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback(){

            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    if (exception instanceof RetriableException) {
                        ConsumerCoordinator.this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", (Object)offsets, (Object)exception);
                        ConsumerCoordinator.this.nextAutoCommitDeadline = Math.min(ConsumerCoordinator.this.time.milliseconds() + ConsumerCoordinator.this.retryBackoffMs, ConsumerCoordinator.this.nextAutoCommitDeadline);
                    } else {
                        ConsumerCoordinator.this.log.warn("Asynchronous auto-commit of offsets {} failed: {}", (Object)offsets, (Object)exception.getMessage());
                    }
                } else {
                    ConsumerCoordinator.this.log.debug("Completed asynchronous auto-commit of offsets {}", (Object)offsets);
                }
            }
        });
    }

    private void maybeAutoCommitOffsetsSync(long timeoutMs) {
        if (this.autoCommitEnabled) {
            Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = this.subscriptions.allConsumed();
            try {
                this.log.debug("Sending synchronous auto-commit of offsets {}", (Object)allConsumedOffsets);
                if (!this.commitOffsetsSync(allConsumedOffsets, timeoutMs)) {
                    this.log.debug("Auto-commit of offsets {} timed out before completion", (Object)allConsumedOffsets);
                }
            }
            catch (InterruptException | WakeupException e) {
                this.log.debug("Auto-commit of offsets {} was interrupted before completion", (Object)allConsumedOffsets);
                throw e;
            }
            catch (Exception e) {
                this.log.warn("Synchronous auto-commit of offsets {} failed: {}", (Object)allConsumedOffsets, (Object)e.getMessage());
            }
        }
    }

    private RequestFuture<Void> sendOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty()) {
            return RequestFuture.voidSuccess();
        }
        Node coordinator = this.checkAndGetCoordinator();
        if (coordinator == null) {
            return RequestFuture.coordinatorNotAvailable();
        }
        HashMap<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry2.getValue();
            if (offsetAndMetadata.offset() < 0L) {
                return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
            }
            offsetData.put(entry2.getKey(), new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
        }
        AbstractCoordinator.Generation generation = this.subscriptions.partitionsAutoAssigned() ? this.generation() : AbstractCoordinator.Generation.NO_GENERATION;
        if (generation == null) {
            return RequestFuture.failure(new CommitFailedException());
        }
        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).setGenerationId(generation.generationId).setMemberId(generation.memberId).setRetentionTime(-1L);
        this.log.trace("Sending OffsetCommit request with {} to coordinator {}", (Object)offsets, (Object)coordinator);
        return this.client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets));
    }

    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions2) {
        Node coordinator = this.checkAndGetCoordinator();
        if (coordinator == null) {
            return RequestFuture.coordinatorNotAvailable();
        }
        this.log.debug("Fetching committed offsets for partitions: {}", (Object)partitions2);
        OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId, new ArrayList<TopicPartition>(partitions2));
        return this.client.send(coordinator, requestBuilder).compose(new OffsetFetchResponseHandler());
    }

    private static class OffsetCommitCompletion {
        private final OffsetCommitCallback callback;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final Exception exception;

        private OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            this.callback = callback;
            this.offsets = offsets;
            this.exception = exception;
        }

        public void invoke() {
            if (this.callback != null) {
                this.callback.onComplete(this.offsets, this.exception);
            }
        }
    }

    private static class MetadataSnapshot {
        private final Map<String, Integer> partitionsPerTopic;

        private MetadataSnapshot(SubscriptionState subscription, Cluster cluster) {
            HashMap<String, Integer> partitionsPerTopic = new HashMap<String, Integer>();
            for (String topic : subscription.groupSubscription()) {
                partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
            }
            this.partitionsPerTopic = partitionsPerTopic;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MetadataSnapshot that = (MetadataSnapshot)o;
            return this.partitionsPerTopic != null ? this.partitionsPerTopic.equals(that.partitionsPerTopic) : that.partitionsPerTopic == null;
        }

        public int hashCode() {
            return this.partitionsPerTopic != null ? this.partitionsPerTopic.hashCode() : 0;
        }
    }

    private class ConsumerCoordinatorMetrics {
        private final String metricGrpName;
        private final Sensor commitLatency;

        private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.commitLatency = metrics.sensor("commit-latency");
            this.commitLatency.add(metrics.metricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request"), new Avg());
            this.commitLatency.add(metrics.metricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request"), new Max());
            this.commitLatency.add(ConsumerCoordinator.this.createMeter(metrics, this.metricGrpName, "commit", "commit calls"));
            Measurable numParts = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return ConsumerCoordinator.this.subscriptions.assignedPartitions().size();
                }
            };
            metrics.addMetric(metrics.metricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer"), numParts);
        }
    }

    private class OffsetFetchResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
        private OffsetFetchResponseHandler() {
        }

        @Override
        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            if (response.hasError()) {
                Errors error = response.error();
                ConsumerCoordinator.this.log.debug("Offset fetch failed: {}", (Object)error.message());
                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                    future.raise(error);
                } else if (error == Errors.NOT_COORDINATOR) {
                    ConsumerCoordinator.this.markCoordinatorUnknown();
                    future.raise(error);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(ConsumerCoordinator.this.groupId));
                } else {
                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                }
                return;
            }
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(response.responseData().size());
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry2 : response.responseData().entrySet()) {
                TopicPartition tp = entry2.getKey();
                OffsetFetchResponse.PartitionData data = entry2.getValue();
                if (data.hasError()) {
                    Errors error = data.error;
                    ConsumerCoordinator.this.log.debug("Failed to fetch offset for partition {}: {}", (Object)tp, (Object)error.message());
                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
                    } else {
                        future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                    }
                    return;
                }
                if (data.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                    continue;
                }
                ConsumerCoordinator.this.log.debug("Found no committed offset for partition {}", (Object)tp);
            }
            future.complete(offsets);
        }
    }

    private class OffsetCommitResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetCommitResponse, Void> {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        private OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            ConsumerCoordinator.this.sensors.commitLatency.record(this.response.requestLatencyMs());
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            for (Map.Entry<TopicPartition, Errors> entry2 : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry2.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset2 = offsetAndMetadata.offset();
                Errors error = entry2.getValue();
                if (error == Errors.NONE) {
                    ConsumerCoordinator.this.log.debug("Committed offset {} for partition {}", (Object)offset2, (Object)tp);
                    continue;
                }
                ConsumerCoordinator.this.log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset2, error.message());
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(ConsumerCoordinator.this.groupId));
                    return;
                }
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(tp.topic());
                    continue;
                }
                if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                    future.raise(error);
                    return;
                }
                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                    future.raise(error);
                    return;
                }
                if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) {
                    ConsumerCoordinator.this.markCoordinatorUnknown();
                    future.raise(error);
                    return;
                }
                if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) {
                    ConsumerCoordinator.this.resetGeneration();
                    future.raise(new CommitFailedException());
                    return;
                }
                if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
                    return;
                }
                future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                return;
            }
            if (!unauthorizedTopics.isEmpty()) {
                ConsumerCoordinator.this.log.error("Not authorized to commit to topics {}", (Object)unauthorizedTopics);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

    private class DefaultOffsetCommitCallback
    implements OffsetCommitCallback {
        private DefaultOffsetCommitCallback() {
        }

        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                ConsumerCoordinator.this.log.error("Offset commit with offsets {} failed", (Object)offsets, (Object)exception);
            }
        }
    }
}

