/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.kafka.KafkaControlProducer;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectControlAgent
implements KafkaControlAgent {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectControlAgent.class);
    private static final Object LOCK = new Object();
    private static final long KAFKA_POLL_TIMEOUT_MS = 100L;
    private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;
    private static KafkaConnectControlAgent agent;
    private final String bootstrapServers;
    private final String controlTopicName;
    private final ExecutorService executorService;
    private final Map<String, TransactionCoordinator> topicCoordinators;
    private final Map<String, ConcurrentLinkedQueue<TransactionParticipant>> partitionWorkers;
    private final KafkaControlProducer producer;
    private KafkaConsumer<String, byte[]> consumer;

    public KafkaConnectControlAgent(String bootstrapServers, String controlTopicName) {
        this.bootstrapServers = bootstrapServers;
        this.controlTopicName = controlTopicName;
        this.executorService = Executors.newSingleThreadExecutor();
        this.topicCoordinators = new HashMap<String, TransactionCoordinator>();
        this.partitionWorkers = new HashMap<String, ConcurrentLinkedQueue<TransactionParticipant>>();
        this.producer = new KafkaControlProducer(bootstrapServers, controlTopicName);
        this.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static KafkaConnectControlAgent createKafkaControlManager(String bootstrapServers, String controlTopicName) {
        if (agent == null) {
            Object object = LOCK;
            synchronized (object) {
                if (agent == null) {
                    agent = new KafkaConnectControlAgent(bootstrapServers, controlTopicName);
                }
            }
        }
        return agent;
    }

    @Override
    public void registerTransactionParticipant(TransactionParticipant worker) {
        if (!this.partitionWorkers.containsKey(worker.getPartition().topic())) {
            this.partitionWorkers.put(worker.getPartition().topic(), new ConcurrentLinkedQueue());
        }
        this.partitionWorkers.get(worker.getPartition().topic()).add(worker);
    }

    @Override
    public void deregisterTransactionParticipant(TransactionParticipant worker) {
        if (this.partitionWorkers.containsKey(worker.getPartition().topic())) {
            this.partitionWorkers.get(worker.getPartition().topic()).remove(worker);
        }
    }

    @Override
    public void registerTransactionCoordinator(TransactionCoordinator coordinator) {
        if (!this.topicCoordinators.containsKey(coordinator.getPartition().topic())) {
            this.topicCoordinators.put(coordinator.getPartition().topic(), coordinator);
        }
    }

    @Override
    public void deregisterTransactionCoordinator(TransactionCoordinator coordinator) {
        this.topicCoordinators.remove(coordinator.getPartition().topic());
    }

    @Override
    public void publishMessage(ControlMessage message) {
        this.producer.publishMessage(message);
    }

    private void start() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("group.id", "hudi-control-group" + UUID.randomUUID().toString());
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("auto.offset.reset", "latest");
        this.consumer = new KafkaConsumer(props, (Deserializer)new StringDeserializer(), (Deserializer)new ByteArrayDeserializer());
        this.consumer.subscribe(Collections.singletonList(this.controlTopicName));
        this.executorService.submit(() -> {
            while (true) {
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord record : records) {
                    try {
                        LOG.debug(String.format("Kafka consumerGroupId = %s topic = %s, partition = %s, offset = %s, customer = %s, country = %s", "", record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                        ControlMessage message = ControlMessage.parseFrom((byte[])record.value());
                        String senderTopic = message.getTopicName();
                        if (message.getReceiverType().equals(ControlMessage.EntityType.PARTICIPANT)) {
                            if (this.partitionWorkers.containsKey(senderTopic)) {
                                for (TransactionParticipant partitionWorker : this.partitionWorkers.get(senderTopic)) {
                                    partitionWorker.processControlEvent(message);
                                }
                                continue;
                            }
                            LOG.warn(String.format("Failed to send message for unregistered participants for topic %s", senderTopic));
                            continue;
                        }
                        if (message.getReceiverType().equals(ControlMessage.EntityType.COORDINATOR)) {
                            if (!this.topicCoordinators.containsKey(senderTopic)) continue;
                            this.topicCoordinators.get(senderTopic).processControlEvent(message);
                            continue;
                        }
                        LOG.warn(String.format("Sender type of Control Message unknown %s", message.getSenderType().name()));
                    }
                    catch (Exception e) {
                        LOG.error(String.format("Fatal error while consuming a kafka record for topic = %s partition = %s", record.topic(), record.partition()), (Throwable)e);
                    }
                }
                try {
                    this.consumer.commitSync();
                    continue;
                }
                catch (CommitFailedException exception) {
                    LOG.error("Fatal error while committing kafka control topic");
                    continue;
                }
                break;
            }
        });
    }

    public void stop() {
        this.producer.stop();
        this.consumer.close();
        if (this.executorService != null) {
            boolean terminated = false;
            try {
                LOG.info("Shutting down executor service.");
                this.executorService.shutdown();
                LOG.info("Awaiting termination.");
                terminated = this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (!terminated) {
                LOG.warn("Unclean Kafka Control Manager executor service shutdown ");
                this.executorService.shutdownNow();
            }
        }
    }
}

