/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.callback.pulsar;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

public class HoodieWriteCommitPulsarCallback
implements HoodieWriteCommitCallback,
Closeable {
    private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitPulsarCallback.class);
    private final String serviceUrl;
    private final String topic;
    private final transient PulsarClient client;
    private final transient Producer<String> producer;

    public HoodieWriteCommitPulsarCallback(HoodieWriteConfig config) throws PulsarClientException {
        this.serviceUrl = config.getString(HoodieWriteCommitPulsarCallbackConfig.BROKER_SERVICE_URL);
        this.topic = config.getString(HoodieWriteCommitPulsarCallbackConfig.TOPIC);
        this.client = this.createClient(config);
        this.producer = this.createProducer(config);
    }

    @Override
    public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
        String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
        try {
            this.producer.newMessage().key(callbackMessage.getTableName()).value((Object)callbackMsg).send();
            LOG.info((Object)"Send callback message succeed");
        }
        catch (Exception e) {
            LOG.error((Object)"Send pulsar callback msg failed : ", (Throwable)e);
        }
    }

    public Producer<String> createProducer(HoodieConfig hoodieConfig) throws PulsarClientException {
        MessageRoutingMode routeMode = Enum.valueOf(MessageRoutingMode.class, HoodieWriteCommitPulsarCallbackConfig.PRODUCER_ROUTE_MODE.defaultValue());
        Duration sendTimeout = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_SEND_TIMEOUT));
        int pendingQueueSize = hoodieConfig.getInt(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_QUEUE_SIZE);
        int pendingSize = hoodieConfig.getInt(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_SIZE);
        boolean blockIfQueueFull = hoodieConfig.getBoolean(HoodieWriteCommitPulsarCallbackConfig.PRODUCER_BLOCK_QUEUE_FULL);
        return this.client.newProducer(Schema.STRING).topic(this.topic).messageRoutingMode(routeMode).sendTimeout((int)sendTimeout.toMillis(), TimeUnit.MILLISECONDS).maxPendingMessages(pendingQueueSize).maxPendingMessagesAcrossPartitions(pendingSize).blockIfQueueFull(blockIfQueueFull).create();
    }

    public PulsarClient createClient(HoodieConfig hoodieConfig) throws PulsarClientException {
        this.validatePulsarConfig();
        Duration operationTimeout = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.OPERATION_TIMEOUT));
        Duration connectionTimeout = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.CONNECTION_TIMEOUT));
        Duration requestTimeout = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.REQUEST_TIMEOUT));
        Duration keepAliveInterval = DateTimeUtils.parseDuration(hoodieConfig.getString(HoodieWriteCommitPulsarCallbackConfig.KEEPALIVE_INTERVAL));
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setServiceUrl(this.serviceUrl);
        clientConfigurationData.setOperationTimeoutMs(operationTimeout.toMillis());
        clientConfigurationData.setConnectionTimeoutMs((int)connectionTimeout.toMillis());
        clientConfigurationData.setRequestTimeoutMs((int)requestTimeout.toMillis());
        clientConfigurationData.setKeepAliveIntervalSeconds((int)keepAliveInterval.getSeconds());
        return new PulsarClientImpl(clientConfigurationData);
    }

    private void validatePulsarConfig() {
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.serviceUrl), String.format("Config %s can not be null or empty", HoodieWriteCommitPulsarCallbackConfig.BROKER_SERVICE_URL.key()));
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(this.topic), String.format("Config %s can not be null or empty", HoodieWriteCommitPulsarCallbackConfig.TOPIC.key()));
    }

    @Override
    public void close() throws IOException {
        if (this.producer != null) {
            try {
                this.producer.close();
            }
            catch (Throwable t) {
                LOG.warn((Object)"Could not properly close the producer.", t);
            }
        }
        if (this.client != null) {
            try {
                this.client.close();
            }
            catch (Throwable t) {
                LOG.warn((Object)"Could not properly close the client.", t);
            }
        }
    }
}

