package org.apache.hudi.connect.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/connect/kafka/KafkaConnectControlAgent.class */
public class KafkaConnectControlAgent implements KafkaControlAgent {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectControlAgent.class);
    private static final Object LOCK = new Object();
    private static final long KAFKA_POLL_TIMEOUT_MS = 100;
    private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;
    private static KafkaConnectControlAgent agent;
    private final String bootstrapServers;
    private final String controlTopicName;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final Map<String, TransactionCoordinator> topicCoordinators = new HashMap();
    private final Map<String, ConcurrentLinkedQueue<TransactionParticipant>> partitionWorkers = new HashMap();
    private final KafkaControlProducer producer;
    private KafkaConsumer<String, byte[]> consumer;

    public KafkaConnectControlAgent(String str, String str2) {
        this.bootstrapServers = str;
        this.controlTopicName = str2;
        this.producer = new KafkaControlProducer(str, str2);
        start();
    }

    public static KafkaConnectControlAgent createKafkaControlManager(String str, String str2) {
        if (agent == null) {
            synchronized (LOCK) {
                if (agent == null) {
                    agent = new KafkaConnectControlAgent(str, str2);
                }
            }
        }
        return agent;
    }

    @Override // org.apache.hudi.connect.kafka.KafkaControlAgent
    public void registerTransactionParticipant(TransactionParticipant transactionParticipant) {
        if (!this.partitionWorkers.containsKey(transactionParticipant.getPartition().topic())) {
            this.partitionWorkers.put(transactionParticipant.getPartition().topic(), new ConcurrentLinkedQueue<>());
        }
        this.partitionWorkers.get(transactionParticipant.getPartition().topic()).add(transactionParticipant);
    }

    @Override // org.apache.hudi.connect.kafka.KafkaControlAgent
    public void deregisterTransactionParticipant(TransactionParticipant transactionParticipant) {
        if (this.partitionWorkers.containsKey(transactionParticipant.getPartition().topic())) {
            this.partitionWorkers.get(transactionParticipant.getPartition().topic()).remove(transactionParticipant);
        }
    }

    @Override // org.apache.hudi.connect.kafka.KafkaControlAgent
    public void registerTransactionCoordinator(TransactionCoordinator transactionCoordinator) {
        if (this.topicCoordinators.containsKey(transactionCoordinator.getPartition().topic())) {
            return;
        }
        this.topicCoordinators.put(transactionCoordinator.getPartition().topic(), transactionCoordinator);
    }

    @Override // org.apache.hudi.connect.kafka.KafkaControlAgent
    public void deregisterTransactionCoordinator(TransactionCoordinator transactionCoordinator) {
        this.topicCoordinators.remove(transactionCoordinator.getPartition().topic());
    }

    @Override // org.apache.hudi.connect.kafka.KafkaControlAgent
    public void publishMessage(ControlMessage controlMessage) {
        this.producer.publishMessage(controlMessage);
    }

    private void start() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("group.id", "hudi-control-group" + UUID.randomUUID().toString());
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        properties.put("auto.offset.reset", "latest");
        this.consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new ByteArrayDeserializer());
        this.consumer.subscribe(Collections.singletonList(this.controlTopicName));
        this.executorService.submit(() -> {
            while (true) {
                Iterator it = this.consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    try {
                        LOG.debug(String.format("Kafka consumerGroupId = %s topic = %s, partition = %s, offset = %s, customer = %s, country = %s", "", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()));
                        ControlMessage parseFrom = ControlMessage.parseFrom((byte[]) consumerRecord.value());
                        String topicName = parseFrom.getTopicName();
                        if (parseFrom.getReceiverType().equals(ControlMessage.EntityType.PARTICIPANT)) {
                            if (this.partitionWorkers.containsKey(topicName)) {
                                Iterator<TransactionParticipant> it2 = this.partitionWorkers.get(topicName).iterator();
                                while (it2.hasNext()) {
                                    it2.next().processControlEvent(parseFrom);
                                }
                            } else {
                                LOG.warn(String.format("Failed to send message for unregistered participants for topic %s", topicName));
                            }
                        } else if (!parseFrom.getReceiverType().equals(ControlMessage.EntityType.COORDINATOR)) {
                            LOG.warn(String.format("Sender type of Control Message unknown %s", parseFrom.getSenderType().name()));
                        } else if (this.topicCoordinators.containsKey(topicName)) {
                            this.topicCoordinators.get(topicName).processControlEvent(parseFrom);
                        }
                    } catch (Exception e) {
                        LOG.error(String.format("Fatal error while consuming a kafka record for topic = %s partition = %s", consumerRecord.topic(), Integer.valueOf(consumerRecord.partition())), e);
                    }
                }
                try {
                    this.consumer.commitSync();
                } catch (CommitFailedException e2) {
                    LOG.error("Fatal error while committing kafka control topic");
                }
            }
        });
    }

    public void stop() {
        this.producer.stop();
        this.consumer.close();
        if (this.executorService != null) {
            boolean z = false;
            try {
                LOG.info("Shutting down executor service.");
                this.executorService.shutdown();
                LOG.info("Awaiting termination.");
                z = this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (z) {
                return;
            }
            LOG.warn("Unclean Kafka Control Manager executor service shutdown ");
            this.executorService.shutdownNow();
        }
    }
}
