/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ThreadUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.Lazy;
import org.apache.hudi.utilities.config.PulsarSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.pulsar.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.Map;

public class PulsarSource
extends RowSource
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSource.class);
    private static final Duration GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20L);
    private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d";
    private static final String[] PULSAR_META_FIELDS = new String[]{"__key", "__topic", "__messageId", "__publishTime", "__eventTime", "__messageProperties"};
    private final String topicName;
    private final String serviceEndpointURL;
    private final String adminEndpointURL;
    private final Lazy<PulsarClient> pulsarClient;
    private final Lazy<Consumer<byte[]>> pulsarConsumer;

    public PulsarSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties(props, Arrays.asList(PulsarSourceConfig.PULSAR_SOURCE_TOPIC_NAME, PulsarSourceConfig.PULSAR_SOURCE_SERVICE_ENDPOINT_URL));
        this.topicName = TopicName.get((String)ConfigUtils.getStringWithAltKeys(props, PulsarSourceConfig.PULSAR_SOURCE_TOPIC_NAME)).toString();
        this.serviceEndpointURL = ConfigUtils.getStringWithAltKeys(props, PulsarSourceConfig.PULSAR_SOURCE_SERVICE_ENDPOINT_URL);
        this.adminEndpointURL = ConfigUtils.getStringWithAltKeys(props, PulsarSourceConfig.PULSAR_SOURCE_ADMIN_ENDPOINT_URL);
        this.pulsarClient = Lazy.lazily(this::initPulsarClient);
        this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
    }

    @Override
    protected Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Pair<MessageId, MessageId> startingEndingOffsetsPair = this.computeOffsets(lastCheckpoint, sourceLimit);
        MessageId startingOffset = startingEndingOffsetsPair.getLeft();
        MessageId endingOffset = startingEndingOffsetsPair.getRight();
        String startingOffsetStr = PulsarSource.convertToOffsetString(this.topicName, startingOffset);
        String endingOffsetStr = PulsarSource.convertToOffsetString(this.topicName, endingOffset);
        Dataset sourceRows = this.sparkSession.read().format("pulsar").option("service.url", this.serviceEndpointURL).option("admin.url", this.adminEndpointURL).option("topics", this.topicName).option("startingOffsets", startingOffsetStr).option("endingOffsets", endingOffsetStr).load();
        return Pair.of(Option.of(this.transform((Dataset<Row>)sourceRows)), new StreamerCheckpointV2(endingOffsetStr));
    }

    @Override
    public void onCommit(String lastCheckpointStr) {
        MessageId latestConsumedOffset = (MessageId)JsonUtils.topicOffsets((String)lastCheckpointStr).apply(this.topicName);
        this.ackOffset(latestConsumedOffset);
    }

    private Dataset<Row> transform(Dataset<Row> rows) {
        return rows.drop(PULSAR_META_FIELDS);
    }

    private Pair<MessageId, MessageId> computeOffsets(Option<Checkpoint> lastCheckpointOpt, long sourceLimit) {
        MessageId startingOffset = this.decodeStartingOffset(lastCheckpointOpt);
        MessageId endingOffset = this.fetchLatestOffset();
        if (endingOffset.compareTo((Object)startingOffset) < 0) {
            String message = String.format("Ending offset (%s) is preceding starting offset (%s) for '%s'", endingOffset, startingOffset, this.topicName);
            throw new HoodieException(message);
        }
        Long maxRecordsLimit = PulsarSource.computeTargetRecordLimit(sourceLimit, this.props);
        return Pair.of(startingOffset, endingOffset);
    }

    private MessageId decodeStartingOffset(Option<Checkpoint> lastCheckpointOpt) {
        return lastCheckpointOpt.map(lastCheckpoint -> (MessageId)JsonUtils.topicOffsets((String)lastCheckpoint.getCheckpointKey()).apply(this.topicName)).orElseGet(() -> {
            PulsarSourceConfig.OffsetAutoResetStrategy autoResetStrategy = PulsarSourceConfig.OffsetAutoResetStrategy.valueOf(ConfigUtils.getStringWithAltKeys((Properties)this.props, PulsarSourceConfig.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY, PulsarSourceConfig.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
            switch (autoResetStrategy) {
                case LATEST: {
                    return this.fetchLatestOffset();
                }
                case EARLIEST: {
                    return MessageId.earliest;
                }
                case FAIL: {
                    throw new IllegalArgumentException("No checkpoint has been provided!");
                }
            }
            throw new UnsupportedOperationException("Unsupported offset auto-reset strategy");
        });
    }

    private void ackOffset(MessageId latestConsumedOffset) {
        try {
            this.pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
        }
        catch (PulsarClientException e) {
            LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", latestConsumedOffset, this.topicName), (Throwable)e);
            throw new HoodieReadFromSourceException("Failed to ack message for topic", e);
        }
    }

    private MessageId fetchLatestOffset() {
        try {
            return this.pulsarConsumer.get().getLastMessageId();
        }
        catch (PulsarClientException e) {
            LOG.error(String.format("Failed to fetch latest messageId for topic '%s'", this.topicName), (Throwable)e);
            throw new HoodieReadFromSourceException("Failed to fetch latest messageId for topic", e);
        }
    }

    private Consumer<byte[]> subscribeToTopic() {
        try {
            String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, System.currentTimeMillis());
            return this.pulsarClient.get().newConsumer().topic(new String[]{this.topicName}).subscriptionName(subscriptionId).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Exclusive).subscribe();
        }
        catch (PulsarClientException e) {
            LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", this.topicName), (Throwable)e);
            throw new HoodieIOException("Failed to subscribe to Pulsar topic", (IOException)((Object)e));
        }
    }

    private PulsarClient initPulsarClient() {
        try {
            return PulsarClient.builder().serviceUrl(this.serviceEndpointURL).build();
        }
        catch (PulsarClientException e) {
            LOG.error(String.format("Failed to init Pulsar client connecting to '%s'", this.serviceEndpointURL), (Throwable)e);
            throw new HoodieIOException("Failed to init Pulsar client", (IOException)((Object)e));
        }
    }

    @Override
    public void close() throws IOException {
        PulsarSource.shutdownPulsarClient(this.pulsarClient.get());
    }

    private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties props) {
        if (sourceLimit < Long.MAX_VALUE) {
            return sourceLimit;
        }
        return ConfigUtils.getLongWithAltKeys(props, PulsarSourceConfig.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD);
    }

    private static String convertToOffsetString(String topic, MessageId startingOffset) {
        return JsonUtils.topicOffsets((Map)HoodieConversionUtils.mapAsScalaImmutableMap(Collections.singletonMap(topic, startingOffset)));
    }

    private static void shutdownPulsarClient(PulsarClient client) throws PulsarClientException {
        client.close();
        try {
            EventLoopGroup eventLoopGroup = ((PulsarClientImpl)client).eventLoopGroup();
            if (eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully().await(GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        ((Stream)ThreadUtils.collectActiveThreads().stream().sequential()).filter(t -> t.getName().startsWith("pulsar-client-io")).forEach(Thread::interrupt);
    }
}

