package org.apache.flink.connector.pulsar.source.reader.fetcher;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.class */
public class PulsarOrderedFetcherManager<T> extends PulsarFetcherManagerBase<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedFetcherManager.class);

    public PulsarOrderedFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> futureCompletingBlockingQueue, Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> supplier) {
        super(futureCompletingBlockingQueue, supplier);
    }

    public void acknowledgeMessages(Map<TopicPartition, MessageId> map) {
        LOG.debug("Acknowledge messages {}", map);
        map.forEach((topicPartition, messageId) -> {
            triggerAcknowledge(getOrCreateFetcher(topicPartition.toString()), topicPartition, messageId);
        });
    }

    private void triggerAcknowledge(SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> splitFetcher, TopicPartition topicPartition, MessageId messageId) {
        ((PulsarOrderedPartitionSplitReader) splitFetcher.getSplitReader()).notifyCheckpointComplete(topicPartition, messageId);
        startFetcher(splitFetcher);
    }
}
