package org.apache.hudi.utilities.callback.pulsar;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.class */
public class HoodieWriteCommitPulsarCallback implements HoodieWriteCommitCallback, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteCommitPulsarCallback.class);
    private final String serviceUrl;
    private final String topic;
    private final transient PulsarClient client;
    private final transient Producer<String> producer;

    public HoodieWriteCommitPulsarCallback(HoodieWriteConfig hoodieWriteConfig) throws PulsarClientException {
        this.serviceUrl = hoodieWriteConfig.getString(HoodieWriteCommitPulsarCallbackConfig.BROKER_SERVICE_URL);
        this.topic = hoodieWriteConfig.getString(HoodieWriteCommitPulsarCallbackConfig.TOPIC);
        this.client = createClient(hoodieWriteConfig);
        this.producer = createProducer(hoodieWriteConfig);
    }

    @Override // org.apache.hudi.callback.HoodieWriteCommitCallback
    public void call(HoodieWriteCommitCallbackMessage hoodieWriteCommitCallbackMessage) {
        try {
            this.producer.newMessage().key(hoodieWriteCommitCallbackMessage.getTableName()).value(HoodieWriteCommitCallbackUtil.convertToJsonString(hoodieWriteCommitCallbackMessage)).send();
            LOG.info("Send callback message succeed");
        } catch (Exception e) {
            LOG.error("Send pulsar callback msg failed : ", e);
        }
    }

    public Producer<String> createProducer(HoodieConfig hoodieConfig) throws PulsarClientException {
        MessageRoutingMode valueOf = Enum.valueOf(MessageRoutingMode.class, HoodieWriteCommitPulsarCallbackConfig.PRODUCER_ROUTE_MODE.defaultValue());
        Duration parseDuration = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_SEND_TIMEOUT));
        int intValue = hoodieConfig.getInt(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_QUEUE_SIZE).intValue();
        int intValue2 = hoodieConfig.getInt(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_SIZE).intValue();
        return this.client.newProducer(Schema.STRING).topic(this.topic).messageRoutingMode(valueOf).sendTimeout((int) parseDuration.toMillis(), TimeUnit.MILLISECONDS).maxPendingMessages(intValue).maxPendingMessagesAcrossPartitions(intValue2).blockIfQueueFull(hoodieConfig.getBoolean(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_BLOCK_QUEUE_FULL).booleanValue()).create();
    }

    public PulsarClient createClient(HoodieConfig hoodieConfig) throws PulsarClientException {
        validatePulsarConfig();
        Duration parseDuration = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.OPERATION_TIMEOUT));
        Duration parseDuration2 = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.CONNECTION_TIMEOUT));
        Duration parseDuration3 = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.REQUEST_TIMEOUT));
        Duration parseDuration4 = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.KEEPALIVE_INTERVAL));
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setServiceUrl(this.serviceUrl);
        clientConfigurationData.setOperationTimeoutMs(parseDuration.toMillis());
        clientConfigurationData.setConnectionTimeoutMs((int) parseDuration2.toMillis());
        clientConfigurationData.setRequestTimeoutMs((int) parseDuration3.toMillis());
        clientConfigurationData.setKeepAliveIntervalSeconds((int) parseDuration4.getSeconds());
        return new PulsarClientImpl(clientConfigurationData);
    }

    private void validatePulsarConfig() {
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.serviceUrl), String.format("Config %s can not be null or empty", HoodieWriteCommitPulsarCallbackConfig.BROKER_SERVICE_URL.key()));
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.topic), String.format("Config %s can not be null or empty", HoodieWriteCommitPulsarCallbackConfig.TOPIC.key()));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.producer != null) {
            try {
                this.producer.close();
            } catch (Throwable th) {
                LOG.warn("Could not properly close the producer.", th);
            }
        }
        if (this.client != null) {
            try {
                this.client.close();
            } catch (Throwable th2) {
                LOG.warn("Could not properly close the client.", th2);
            }
        }
    }
}
