package org.apache.flink.connector.kafka.source;

import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

/* loaded from: input_file:org/apache/flink/connector/kafka/source/KafkaSource.class */
public class KafkaSource<OUT> implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = -8755372893283732098L;
    private final KafkaSubscriber subscriber;
    private final OffsetsInitializer startingOffsetsInitializer;
    private final OffsetsInitializer stoppingOffsetsInitializer;
    private final Boundedness boundedness;
    private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
    private final Properties props;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSource(KafkaSubscriber kafkaSubscriber, OffsetsInitializer offsetsInitializer, @Nullable OffsetsInitializer offsetsInitializer2, Boundedness boundedness, KafkaRecordDeserializationSchema<OUT> kafkaRecordDeserializationSchema, Properties properties) {
        this.subscriber = kafkaSubscriber;
        this.startingOffsetsInitializer = offsetsInitializer;
        this.stoppingOffsetsInitializer = offsetsInitializer2;
        this.boundedness = boundedness;
        this.deserializationSchema = kafkaRecordDeserializationSchema;
        this.props = properties;
    }

    public static <OUT> KafkaSourceBuilder<OUT> builder() {
        return new KafkaSourceBuilder<>();
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return createReader(sourceReaderContext, collection -> {
        });
    }

    @VisibleForTesting
    SourceReader<OUT, KafkaPartitionSplit> createReader(final SourceReaderContext sourceReaderContext, Consumer<Collection<String>> consumer) throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext() { // from class: org.apache.flink.connector.kafka.source.KafkaSource.1
            public MetricGroup getMetricGroup() {
                return sourceReaderContext.metricGroup().addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return sourceReaderContext.getUserCodeClassLoader();
            }
        });
        KafkaSourceReaderMetrics kafkaSourceReaderMetrics = new KafkaSourceReaderMetrics(sourceReaderContext.metricGroup());
        Supplier supplier = () -> {
            return new KafkaPartitionSplitReader(this.props, sourceReaderContext, kafkaSourceReaderMetrics);
        };
        KafkaRecordEmitter kafkaRecordEmitter = new KafkaRecordEmitter(this.deserializationSchema);
        supplier.getClass();
        return new KafkaSourceReader(futureCompletingBlockingQueue, new KafkaSourceFetcherManager(futureCompletingBlockingQueue, supplier::get, consumer), kafkaRecordEmitter, toConfiguration(this.props), sourceReaderContext, kafkaSourceReaderMetrics);
    }

    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext) {
        return new KafkaSourceEnumerator(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, splitEnumeratorContext, this.boundedness);
    }

    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator(SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext, KafkaSourceEnumState kafkaSourceEnumState) throws IOException {
        return new KafkaSourceEnumerator(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, splitEnumeratorContext, this.boundedness, kafkaSourceEnumState.assignedPartitions());
    }

    public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() {
        return new KafkaPartitionSplitSerializer();
    }

    public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() {
        return new KafkaSourceEnumStateSerializer();
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    private Configuration toConfiguration(Properties properties) {
        Configuration configuration = new Configuration();
        properties.stringPropertyNames().forEach(str -> {
            configuration.setString(str, properties.getProperty(str));
        });
        return configuration;
    }

    @VisibleForTesting
    Configuration getConfiguration() {
        return toConfiguration(this.props);
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<KafkaPartitionSplit>) splitEnumeratorContext, (KafkaSourceEnumState) obj);
    }
}
