package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.eclipse.persistence.oxm.XMLConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.class */
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements Checkpointed<Serializable> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FlinkKafkaProducerBase.class);
    private static final long serialVersionUID = 1;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    protected final int[] partitions;
    protected final Properties producerConfig;
    protected final String defaultTopicId;
    protected final KeyedSerializationSchema<IN> schema;
    protected final KafkaPartitioner<IN> partitioner;
    protected boolean logFailuresOnly;
    private boolean flushOnCheckpoint;
    protected transient KafkaProducer<byte[], byte[]> producer;
    protected transient Callback callback;
    protected volatile transient Exception asyncException;
    protected final SerializableObject pendingRecordsLock = new SerializableObject();
    protected long pendingRecords;

    public FlinkKafkaProducerBase(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties, KafkaPartitioner<IN> kafkaPartitioner) {
        Objects.requireNonNull(str, "TopicID not set");
        Objects.requireNonNull(keyedSerializationSchema, "serializationSchema not set");
        Objects.requireNonNull(properties, "producerConfig not set");
        ClosureCleaner.ensureSerializable(kafkaPartitioner);
        ClosureCleaner.ensureSerializable(keyedSerializationSchema);
        this.defaultTopicId = str;
        this.schema = keyedSerializationSchema;
        this.producerConfig = properties;
        if (properties.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        } else {
            this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        }
        if (properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
            LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        } else {
            this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        }
        Producer kafkaProducer = getKafkaProducer(this.producerConfig);
        Throwable th = null;
        try {
            List<PartitionInfo> partitionsFor = kafkaProducer.partitionsFor(str);
            this.partitions = new int[partitionsFor.size()];
            for (int i = 0; i < this.partitions.length; i++) {
                this.partitions[i] = partitionsFor.get(i).partition();
            }
            kafkaProducer.close();
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            this.partitioner = kafkaPartitioner;
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void setFlushOnCheckpoint(boolean z) {
        this.flushOnCheckpoint = z;
    }

    protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties properties) {
        return new KafkaProducer<>(properties);
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) {
        this.producer = getKafkaProducer(this.producerConfig);
        RuntimeContext runtimeContext = getRuntimeContext();
        if (this.partitioner != null) {
            this.partitioner.open(runtimeContext.getIndexOfThisSubtask(), runtimeContext.getNumberOfParallelSubtasks(), this.partitions);
        }
        LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", Integer.valueOf(runtimeContext.getIndexOfThisSubtask() + 1), Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks()), this.defaultTopicId);
        if (!Boolean.parseBoolean(this.producerConfig.getProperty("flink.disable-metrics", XMLConstants.BOOLEAN_STRING_FALSE))) {
            Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
                    addGroup.gauge(entry.getKey().name(), (String) new KafkaMetricWrapper(entry.getValue()));
                }
            }
        }
        if (this.flushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        if (this.logFailuresOnly) {
            this.callback = new Callback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.1
                @Override // org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        FlinkKafkaProducerBase.LOG.error("Error while sending record to Kafka: " + exc.getMessage(), (Throwable) exc);
                    }
                    FlinkKafkaProducerBase.this.acknowledgeMessage();
                }
            };
        } else {
            this.callback = new Callback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.2
                @Override // org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null && FlinkKafkaProducerBase.this.asyncException == null) {
                        FlinkKafkaProducerBase.this.asyncException = exc;
                    }
                    FlinkKafkaProducerBase.this.acknowledgeMessage();
                }
            };
        }
    }

    @Override // org.apache.flink.streaming.api.functions.sink.RichSinkFunction, org.apache.flink.streaming.api.functions.sink.SinkFunction
    public void invoke(IN in) throws Exception {
        checkErroneous();
        byte[] serializeKey = this.schema.serializeKey(in);
        byte[] serializeValue = this.schema.serializeValue(in);
        String targetTopic = this.schema.getTargetTopic(in);
        if (targetTopic == null) {
            targetTopic = this.defaultTopicId;
        }
        ProducerRecord<byte[], byte[]> producerRecord = this.partitioner == null ? new ProducerRecord<>(targetTopic, serializeKey, serializeValue) : new ProducerRecord<>(targetTopic, Integer.valueOf(this.partitioner.partition(in, serializeKey, serializeValue, this.partitions.length)), serializeKey, serializeValue);
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords++;
            }
        }
        this.producer.send(producerRecord, this.callback);
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        checkErroneous();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords--;
                if (this.pendingRecords == 0) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }

    protected abstract void flush();

    @Override // org.apache.flink.streaming.api.checkpoint.Checkpointed
    /* renamed from: snapshotState */
    public Serializable mo3114snapshotState(long j, long j2) {
        if (!this.flushOnCheckpoint) {
            return null;
        }
        flush();
        synchronized (this.pendingRecordsLock) {
            if (this.pendingRecords != 0) {
                throw new IllegalStateException("Pending record count must be zero at this point: " + this.pendingRecords);
            }
        }
        return null;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.Checkpointed
    public void restoreState(Serializable serializable) {
    }

    protected void checkErroneous() throws Exception {
        Exception exc = this.asyncException;
        if (exc != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + exc.getMessage(), exc);
        }
    }

    public static Properties getPropertiesFromBrokerList(String str) {
        for (String str2 : str.split(",")) {
            NetUtils.getCorrectHostnamePort(str2);
        }
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        return properties;
    }
}
