/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.transaction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.CoordinatorEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectTransactionServices;
import org.apache.hudi.exception.HoodieException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectTransactionCoordinator
implements TransactionCoordinator,
Runnable {
    public static final int COORDINATOR_KAFKA_PARTITION = 0;
    private static final Logger LOG = LoggerFactory.getLogger(ConnectTransactionCoordinator.class);
    private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
    private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
    private static final String KAFKA_OFFSET_DELIMITER = ",";
    private static final String KAFKA_OFFSET_KV_DELIMITER = "=";
    private static final Long START_COMMIT_INIT_DELAY_MS = 100L;
    private static final Long RESTART_COMMIT_DELAY_MS = 500L;
    private static final int COORDINATOR_EVENT_LOOP_TIMEOUT_MS = 1000;
    private final KafkaConnectConfigs configs;
    private final TopicPartition partition;
    private final KafkaControlAgent kafkaControlClient;
    private final ConnectTransactionServices transactionServices;
    private final KafkaPartitionProvider partitionProvider;
    private final Map<Integer, List<WriteStatus>> partitionsWriteStatusReceived;
    private final Map<Integer, Long> currentConsumedKafkaOffsets;
    private final AtomicBoolean hasStarted = new AtomicBoolean(false);
    private final BlockingQueue<CoordinatorEvent> events;
    private final ExecutorService executorService;
    private final ScheduledExecutorService scheduler;
    private String currentCommitTime;
    private Map<Integer, Long> globalCommittedKafkaOffsets;
    private State currentState;
    private int numPartitions;

    public ConnectTransactionCoordinator(KafkaConnectConfigs configs, TopicPartition partition, KafkaControlAgent kafkaControlClient) throws HoodieException {
        this(configs, partition, kafkaControlClient, new KafkaConnectTransactionServices(configs), KafkaConnectUtils::getLatestNumPartitions);
    }

    public ConnectTransactionCoordinator(KafkaConnectConfigs configs, TopicPartition partition, KafkaControlAgent kafkaControlClient, ConnectTransactionServices transactionServices, KafkaPartitionProvider partitionProvider) {
        this.configs = configs;
        this.partition = partition;
        this.kafkaControlClient = kafkaControlClient;
        this.transactionServices = transactionServices;
        this.partitionProvider = partitionProvider;
        this.events = new LinkedBlockingQueue<CoordinatorEvent>();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.executorService = Executors.newSingleThreadExecutor();
        this.currentCommitTime = "";
        this.partitionsWriteStatusReceived = new HashMap<Integer, List<WriteStatus>>();
        this.globalCommittedKafkaOffsets = new HashMap<Integer, Long>();
        this.currentConsumedKafkaOffsets = new HashMap<Integer, Long>();
        this.currentState = State.INIT;
    }

    @Override
    public void start() {
        if (this.hasStarted.compareAndSet(false, true)) {
            this.executorService.submit(this);
        }
        this.kafkaControlClient.registerTransactionCoordinator(this);
        LOG.info(String.format("Start Transaction Coordinator for topic %s partition %s", this.partition.topic(), this.partition.partition()));
        this.initializeGlobalCommittedKafkaOffsets();
        this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), START_COMMIT_INIT_DELAY_MS, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        this.kafkaControlClient.deregisterTransactionCoordinator(this);
        this.scheduler.shutdownNow();
        this.hasStarted.set(false);
        if (this.executorService != null) {
            boolean terminated = false;
            try {
                LOG.info("Shutting down executor service.");
                this.executorService.shutdown();
                LOG.info("Awaiting termination.");
                terminated = this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (!terminated) {
                LOG.warn("Unclean Kafka Control Manager executor service shutdown ");
                this.executorService.shutdownNow();
            }
        }
    }

    @Override
    public TopicPartition getPartition() {
        return this.partition;
    }

    @Override
    public void processControlEvent(ControlMessage message) {
        if (!message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
            LOG.warn(String.format("The Coordinator should not be receiving messages of type %s", message.getType().name()));
            return;
        }
        CoordinatorEvent.CoordinatorEventType type2 = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
        CoordinatorEvent event = new CoordinatorEvent(type2, message.getTopicName(), message.getCommitTime());
        event.setMessage(message);
        this.submitEvent(event);
    }

    @Override
    public void run() {
        while (true) {
            try {
                while (true) {
                    CoordinatorEvent event;
                    if ((event = this.events.poll(1000L, TimeUnit.MILLISECONDS)) == null) {
                        continue;
                    }
                    this.processCoordinatorEvent(event);
                }
            }
            catch (InterruptedException exception) {
                LOG.warn("Error received while polling the event loop in Partition Coordinator", (Throwable)exception);
                continue;
            }
            break;
        }
    }

    private void submitEvent(CoordinatorEvent event) {
        this.submitEvent(event, 0L, TimeUnit.SECONDS);
    }

    private void submitEvent(CoordinatorEvent event, long delay, TimeUnit unit) {
        this.scheduler.schedule(() -> this.events.add(event), delay, unit);
    }

    private void processCoordinatorEvent(CoordinatorEvent event) {
        try {
            if (event == null || !event.getEventType().equals((Object)CoordinatorEvent.CoordinatorEventType.START_COMMIT) && !event.getCommitTime().equals(this.currentCommitTime)) {
                return;
            }
            switch (event.getEventType()) {
                case START_COMMIT: {
                    this.startNewCommit();
                    break;
                }
                case END_COMMIT: {
                    this.endExistingCommit();
                    break;
                }
                case WRITE_STATUS: {
                    if (event.getMessage() != null && this.currentState.equals((Object)State.ENDED_COMMIT)) {
                        this.onReceiveWriteStatus(event.getMessage());
                        break;
                    }
                    LOG.warn("Could not process WRITE_STATUS due to missing message");
                    break;
                }
                case ACK_COMMIT: {
                    this.submitAckCommit();
                    break;
                }
                case WRITE_STATUS_TIMEOUT: {
                    this.handleWriteStatusTimeout();
                    break;
                }
                default: {
                    throw new IllegalStateException("Partition Coordinator has received an illegal event type " + event.getEventType().name());
                }
            }
        }
        catch (Exception exception) {
            LOG.warn("Error received while polling the event loop in Partition Coordinator", (Throwable)exception);
        }
    }

    private void startNewCommit() {
        this.numPartitions = this.partitionProvider.getLatestNumPartitions(this.configs.getString(BOOTSTRAP_SERVERS_CFG), this.partition.topic());
        this.partitionsWriteStatusReceived.clear();
        try {
            this.currentCommitTime = this.transactionServices.startCommit();
            this.kafkaControlClient.publishMessage(this.buildControlMessage(ControlMessage.EventType.START_COMMIT));
            this.currentState = State.STARTED_COMMIT;
            this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.END_COMMIT, this.partition.topic(), this.currentCommitTime), this.configs.getCommitIntervalSecs(), TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            LOG.error(String.format("Failed to start a new commit %s, will retry", this.currentCommitTime), (Throwable)exception);
            this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
        }
    }

    private void endExistingCommit() {
        try {
            this.kafkaControlClient.publishMessage(this.buildControlMessage(ControlMessage.EventType.END_COMMIT));
        }
        catch (Exception exception) {
            LOG.warn(String.format("Could not send END_COMMIT message for partition %s and commitTime %s", this.partition, this.currentCommitTime), (Throwable)exception);
        }
        this.currentConsumedKafkaOffsets.clear();
        this.currentState = State.ENDED_COMMIT;
        this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.WRITE_STATUS_TIMEOUT, this.partition.topic(), this.currentCommitTime), this.configs.getCoordinatorWriteTimeoutSecs(), TimeUnit.SECONDS);
    }

    private void onReceiveWriteStatus(ControlMessage message) {
        ControlMessage.ParticipantInfo participantInfo = message.getParticipantInfo();
        int partitionId = message.getSenderPartition();
        this.partitionsWriteStatusReceived.put(partitionId, KafkaConnectUtils.getWriteStatuses(participantInfo));
        this.currentConsumedKafkaOffsets.put(partitionId, participantInfo.getKafkaOffset());
        if (this.partitionsWriteStatusReceived.size() >= this.numPartitions && this.currentState.equals((Object)State.ENDED_COMMIT)) {
            try {
                boolean hasErrors;
                ArrayList<WriteStatus> allWriteStatuses = new ArrayList<WriteStatus>();
                this.partitionsWriteStatusReceived.forEach((key, value) -> allWriteStatuses.addAll((Collection<WriteStatus>)value));
                long totalErrorRecords = (long)allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum();
                long totalRecords = (long)allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
                boolean bl = hasErrors = totalErrorRecords > 0L;
                if (!hasErrors || this.configs.allowCommitOnErrors().booleanValue()) {
                    boolean success = this.transactionServices.endCommit(this.currentCommitTime, allWriteStatuses, this.transformKafkaOffsets(this.currentConsumedKafkaOffsets));
                    if (success) {
                        LOG.info("Commit " + this.currentCommitTime + " successful!");
                        this.currentState = State.WRITE_STATUS_RCVD;
                        this.globalCommittedKafkaOffsets.putAll(this.currentConsumedKafkaOffsets);
                        this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT, message.getTopicName(), this.currentCommitTime));
                        return;
                    }
                    LOG.error("Commit " + this.currentCommitTime + " failed!");
                } else if (hasErrors) {
                    LOG.error("Coordinator found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
                    LOG.error("Printing out the top 100 errors");
                    allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100L).forEach(ws -> {
                        LOG.error("Global error :", ws.getGlobalError());
                        if (ws.getErrors().size() > 0) {
                            ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
                        }
                    });
                }
                this.currentState = State.FAILED_COMMIT;
                LOG.warn("Current commit " + this.currentCommitTime + " failed, so starting a new commit after recovery delay");
                this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
            }
            catch (Exception exception) {
                LOG.error("Fatal error while committing file", (Throwable)exception);
            }
        }
    }

    private void handleWriteStatusTimeout() {
        if (this.currentState.equals((Object)State.ENDED_COMMIT)) {
            this.currentState = State.WRITE_STATUS_TIMEDOUT;
            LOG.warn("Current commit " + this.currentCommitTime + " failed after a write status timeout, so starting a new commit after recovery delay");
            this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
        }
    }

    private void submitAckCommit() {
        try {
            this.kafkaControlClient.publishMessage(this.buildControlMessage(ControlMessage.EventType.ACK_COMMIT));
        }
        catch (Exception exception) {
            LOG.warn(String.format("Could not send ACK_COMMIT message for partition %s and commitTime %s", this.partition, this.currentCommitTime), (Throwable)exception);
        }
        this.currentState = State.ACKED_COMMIT;
        this.submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), START_COMMIT_INIT_DELAY_MS, TimeUnit.MILLISECONDS);
    }

    private void initializeGlobalCommittedKafkaOffsets() {
        try {
            Map<String, String> commitMetadata = this.transactionServices.fetchLatestExtraCommitMetadata();
            String latestKafkaOffsets = commitMetadata.get(KAFKA_OFFSET_KEY);
            if (!StringUtils.isNullOrEmpty(latestKafkaOffsets)) {
                LOG.info("Retrieved Raw Kafka offsets from Hudi Commit File " + latestKafkaOffsets);
                this.globalCommittedKafkaOffsets = Arrays.stream(latestKafkaOffsets.split(KAFKA_OFFSET_DELIMITER)).map(entry -> entry.split(KAFKA_OFFSET_KV_DELIMITER)).collect(Collectors.toMap(entry -> Integer.parseInt(entry[0]), entry -> Long.parseLong(entry[1])));
                LOG.info("Initialized the kafka offset commits " + this.globalCommittedKafkaOffsets);
            }
        }
        catch (Exception exception) {
            throw new HoodieException("Could not deserialize the kafka commit offsets", exception);
        }
    }

    private Map<String, String> transformKafkaOffsets(Map<Integer, Long> kafkaOffsets) {
        try {
            String kafkaOffsetValue = kafkaOffsets.keySet().stream().map(key -> key + KAFKA_OFFSET_KV_DELIMITER + kafkaOffsets.get(key)).collect(Collectors.joining(KAFKA_OFFSET_DELIMITER));
            return Collections.singletonMap(KAFKA_OFFSET_KEY, kafkaOffsetValue);
        }
        catch (Exception exception) {
            throw new HoodieException("Could not serialize the kafka commit offsets", exception);
        }
    }

    private ControlMessage buildControlMessage(ControlMessage.EventType eventType) {
        return ControlMessage.newBuilder().setProtocolVersion(0).setType(eventType).setTopicName(this.partition.topic()).setSenderType(ControlMessage.EntityType.COORDINATOR).setSenderPartition(this.partition.partition()).setReceiverType(ControlMessage.EntityType.PARTICIPANT).setCommitTime(this.currentCommitTime).setCoordinatorInfo(ControlMessage.CoordinatorInfo.newBuilder().putAllGlobalKafkaCommitOffsets(this.globalCommittedKafkaOffsets).build()).build();
    }

    public static interface KafkaPartitionProvider {
        public int getLatestNumPartitions(String var1, String var2);
    }

    private static enum State {
        INIT,
        STARTED_COMMIT,
        ENDED_COMMIT,
        FAILED_COMMIT,
        WRITE_STATUS_RCVD,
        WRITE_STATUS_TIMEDOUT,
        ACKED_COMMIT;

    }
}

