/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.callback.kafka;

import java.util.Properties;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieWriteCommitKafkaCallback
implements HoodieWriteCommitCallback {
    private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitKafkaCallback.class);
    private Properties props;
    private String bootstrapServers;
    private String topic;

    public HoodieWriteCommitKafkaCallback(HoodieWriteConfig config) {
        this.props = config.getProps();
        this.bootstrapServers = this.props.getProperty("hoodie.write.commit.callback.kafka.bootstrap.servers");
        this.topic = this.props.getProperty("hoodie.write.commit.callback.kafka.topic");
        this.validateKafkaConfig();
    }

    @Override
    public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
        String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
        try (KafkaProducer<String, String> producer = this.createProducer(this.props);){
            ProducerRecord<String, String> record = this.buildProducerRecord(this.props, callbackMsg);
            producer.send(record);
            LOG.info((Object)String.format("Send callback message %s succeed", callbackMsg));
        }
        catch (Exception e) {
            LOG.error((Object)"Send kafka callback msg failed : ", (Throwable)e);
        }
    }

    public KafkaProducer<String, String> createProducer(Properties props) {
        Properties kafkaProducerProps = new Properties();
        kafkaProducerProps.setProperty("bootstrap.servers", this.bootstrapServers);
        kafkaProducerProps.setProperty("acks", props.getProperty("hoodie.write.commit.callback.kafka.acks"));
        kafkaProducerProps.setProperty("retries", props.getProperty("hoodie.write.commit.callback.kafka.retries"));
        kafkaProducerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        LOG.debug((Object)("Callback kafka producer init with configs: " + HoodieWriteCommitCallbackUtil.convertToJsonString(kafkaProducerProps)));
        return new KafkaProducer<String, String>(kafkaProducerProps);
    }

    private ProducerRecord<String, String> buildProducerRecord(Properties props, String callbackMsg) {
        String partition = props.getProperty("hoodie.write.commit.callback.kafka.partition");
        if (null != partition) {
            return new ProducerRecord<String, String>(this.topic, Integer.valueOf(partition), props.getProperty("hoodie.table.name"), callbackMsg);
        }
        return new ProducerRecord<String, String>(this.topic, props.getProperty("hoodie.table.name"), callbackMsg);
    }

    private void validateKafkaConfig() {
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.bootstrapServers), String.format("Config %s can not be null or empty", "hoodie.write.commit.callback.kafka.bootstrap.servers"));
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.topic), String.format("Config %s can not be null or empty", "hoodie.write.commit.callback.kafka.topic"));
    }

    private static class ProducerSendCallback
    implements Callback {
        private ProducerSendCallback() {
        }

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            LOG.info((Object)String.format("message offset=%s partition=%s timestamp=%s topic=%s", metadata.offset(), metadata.partition(), metadata.timestamp(), metadata.topic()));
        }
    }
}

