package org.apache.hudi.connect.transaction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.CoordinatorEvent;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectTransactionServices;
import org.apache.hudi.exception.HoodieException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.class */
public class ConnectTransactionCoordinator implements TransactionCoordinator, Runnable {
    public static final int COORDINATOR_KAFKA_PARTITION = 0;
    private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
    private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
    private static final String KAFKA_OFFSET_DELIMITER = ",";
    private static final String KAFKA_OFFSET_KV_DELIMITER = "=";
    private static final int COORDINATOR_EVENT_LOOP_TIMEOUT_MS = 1000;
    private final KafkaConnectConfigs configs;
    private final TopicPartition partition;
    private final KafkaControlAgent kafkaControlClient;
    private final ConnectTransactionServices transactionServices;
    private final KafkaPartitionProvider partitionProvider;
    private final Map<Integer, List<WriteStatus>> partitionsWriteStatusReceived;
    private final Map<Integer, Long> currentConsumedKafkaOffsets;
    private final AtomicBoolean hasStarted;
    private final BlockingQueue<CoordinatorEvent> events;
    private final ExecutorService executorService;
    private final ScheduledExecutorService scheduler;
    private String currentCommitTime;
    private Map<Integer, Long> globalCommittedKafkaOffsets;
    private State currentState;
    private int numPartitions;
    private static final Logger LOG = LoggerFactory.getLogger(ConnectTransactionCoordinator.class);
    private static final Long START_COMMIT_INIT_DELAY_MS = 100L;
    private static final Long RESTART_COMMIT_DELAY_MS = 500L;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.connect.transaction.ConnectTransactionCoordinator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/connect/transaction/ConnectTransactionCoordinator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType = new int[CoordinatorEvent.CoordinatorEventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType[CoordinatorEvent.CoordinatorEventType.START_COMMIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType[CoordinatorEvent.CoordinatorEventType.END_COMMIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType[CoordinatorEvent.CoordinatorEventType.WRITE_STATUS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType[CoordinatorEvent.CoordinatorEventType.ACK_COMMIT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType[CoordinatorEvent.CoordinatorEventType.WRITE_STATUS_TIMEOUT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:org/apache/hudi/connect/transaction/ConnectTransactionCoordinator$KafkaPartitionProvider.class */
    public interface KafkaPartitionProvider {
        int getLatestNumPartitions(String str, String str2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/connect/transaction/ConnectTransactionCoordinator$State.class */
    public enum State {
        INIT,
        STARTED_COMMIT,
        ENDED_COMMIT,
        FAILED_COMMIT,
        WRITE_STATUS_RCVD,
        WRITE_STATUS_TIMEDOUT,
        ACKED_COMMIT
    }

    public ConnectTransactionCoordinator(KafkaConnectConfigs kafkaConnectConfigs, TopicPartition topicPartition, KafkaControlAgent kafkaControlAgent) throws HoodieException {
        this(kafkaConnectConfigs, topicPartition, kafkaControlAgent, new KafkaConnectTransactionServices(kafkaConnectConfigs), KafkaConnectUtils::getLatestNumPartitions);
    }

    public ConnectTransactionCoordinator(KafkaConnectConfigs kafkaConnectConfigs, TopicPartition topicPartition, KafkaControlAgent kafkaControlAgent, ConnectTransactionServices connectTransactionServices, KafkaPartitionProvider kafkaPartitionProvider) {
        this.hasStarted = new AtomicBoolean(false);
        this.configs = kafkaConnectConfigs;
        this.partition = topicPartition;
        this.kafkaControlClient = kafkaControlAgent;
        this.transactionServices = connectTransactionServices;
        this.partitionProvider = kafkaPartitionProvider;
        this.events = new LinkedBlockingQueue();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.executorService = Executors.newSingleThreadExecutor();
        this.currentCommitTime = "";
        this.partitionsWriteStatusReceived = new HashMap();
        this.globalCommittedKafkaOffsets = new HashMap();
        this.currentConsumedKafkaOffsets = new HashMap();
        this.currentState = State.INIT;
    }

    @Override // org.apache.hudi.connect.transaction.TransactionCoordinator
    public void start() {
        if (this.hasStarted.compareAndSet(false, true)) {
            this.executorService.submit(this);
        }
        this.kafkaControlClient.registerTransactionCoordinator(this);
        LOG.info(String.format("Start Transaction Coordinator for topic %s partition %s", this.partition.topic(), Integer.valueOf(this.partition.partition())));
        initializeGlobalCommittedKafkaOffsets();
        submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), START_COMMIT_INIT_DELAY_MS.longValue(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.hudi.connect.transaction.TransactionCoordinator
    public void stop() {
        this.kafkaControlClient.deregisterTransactionCoordinator(this);
        this.scheduler.shutdownNow();
        this.hasStarted.set(false);
        if (this.executorService != null) {
            boolean z = false;
            try {
                LOG.info("Shutting down executor service.");
                this.executorService.shutdown();
                LOG.info("Awaiting termination.");
                z = this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (z) {
                return;
            }
            LOG.warn("Unclean Kafka Control Manager executor service shutdown ");
            this.executorService.shutdownNow();
        }
    }

    @Override // org.apache.hudi.connect.transaction.TransactionCoordinator
    public TopicPartition getPartition() {
        return this.partition;
    }

    @Override // org.apache.hudi.connect.transaction.TransactionCoordinator
    public void processControlEvent(ControlMessage controlMessage) {
        if (!controlMessage.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
            LOG.warn(String.format("The Coordinator should not be receiving messages of type %s", controlMessage.getType().name()));
            return;
        }
        CoordinatorEvent coordinatorEvent = new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.WRITE_STATUS, controlMessage.getTopicName(), controlMessage.getCommitTime());
        coordinatorEvent.setMessage(controlMessage);
        submitEvent(coordinatorEvent);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                CoordinatorEvent poll = this.events.poll(1000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    processCoordinatorEvent(poll);
                }
            } catch (InterruptedException e) {
                LOG.warn("Error received while polling the event loop in Partition Coordinator", e);
            }
        }
    }

    private void submitEvent(CoordinatorEvent coordinatorEvent) {
        submitEvent(coordinatorEvent, 0L, TimeUnit.SECONDS);
    }

    private void submitEvent(CoordinatorEvent coordinatorEvent, long j, TimeUnit timeUnit) {
        this.scheduler.schedule(() -> {
            this.events.add(coordinatorEvent);
        }, j, timeUnit);
    }

    private void processCoordinatorEvent(CoordinatorEvent coordinatorEvent) {
        if (coordinatorEvent != null) {
            try {
                if (coordinatorEvent.getEventType().equals(CoordinatorEvent.CoordinatorEventType.START_COMMIT) || coordinatorEvent.getCommitTime().equals(this.currentCommitTime)) {
                    switch (AnonymousClass1.$SwitchMap$org$apache$hudi$connect$transaction$CoordinatorEvent$CoordinatorEventType[coordinatorEvent.getEventType().ordinal()]) {
                        case 1:
                            startNewCommit();
                            break;
                        case 2:
                            endExistingCommit();
                            break;
                        case 3:
                            if (coordinatorEvent.getMessage() != null && this.currentState.equals(State.ENDED_COMMIT)) {
                                onReceiveWriteStatus(coordinatorEvent.getMessage());
                                break;
                            } else {
                                LOG.warn("Could not process WRITE_STATUS due to missing message");
                                break;
                            }
                            break;
                        case ControlMessage.SENDER_TYPE_FIELD_NUMBER /* 4 */:
                            submitAckCommit();
                            break;
                        case ControlMessage.SENDER_PARTITION_FIELD_NUMBER /* 5 */:
                            handleWriteStatusTimeout();
                            break;
                        default:
                            throw new IllegalStateException("Partition Coordinator has received an illegal event type " + coordinatorEvent.getEventType().name());
                    }
                }
            } catch (Exception e) {
                LOG.warn("Error received while polling the event loop in Partition Coordinator", e);
            }
        }
    }

    private void startNewCommit() {
        this.numPartitions = this.partitionProvider.getLatestNumPartitions(this.configs.getString(BOOTSTRAP_SERVERS_CFG), this.partition.topic());
        this.partitionsWriteStatusReceived.clear();
        try {
            this.currentCommitTime = this.transactionServices.startCommit();
            this.kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.START_COMMIT));
            this.currentState = State.STARTED_COMMIT;
            submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.END_COMMIT, this.partition.topic(), this.currentCommitTime), this.configs.getCommitIntervalSecs().longValue(), TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.error(String.format("Failed to start a new commit %s, will retry", this.currentCommitTime), e);
            submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), RESTART_COMMIT_DELAY_MS.longValue(), TimeUnit.MILLISECONDS);
        }
    }

    private void endExistingCommit() {
        try {
            this.kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.END_COMMIT));
        } catch (Exception e) {
            LOG.warn(String.format("Could not send END_COMMIT message for partition %s and commitTime %s", this.partition, this.currentCommitTime), e);
        }
        this.currentConsumedKafkaOffsets.clear();
        this.currentState = State.ENDED_COMMIT;
        submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.WRITE_STATUS_TIMEOUT, this.partition.topic(), this.currentCommitTime), this.configs.getCoordinatorWriteTimeoutSecs().longValue(), TimeUnit.SECONDS);
    }

    private void onReceiveWriteStatus(ControlMessage controlMessage) {
        ControlMessage.ParticipantInfo participantInfo = controlMessage.getParticipantInfo();
        int senderPartition = controlMessage.getSenderPartition();
        this.partitionsWriteStatusReceived.put(Integer.valueOf(senderPartition), KafkaConnectUtils.getWriteStatuses(participantInfo));
        this.currentConsumedKafkaOffsets.put(Integer.valueOf(senderPartition), Long.valueOf(participantInfo.getKafkaOffset()));
        if (this.partitionsWriteStatusReceived.size() < this.numPartitions || !this.currentState.equals(State.ENDED_COMMIT)) {
            return;
        }
        try {
            ArrayList arrayList = new ArrayList();
            this.partitionsWriteStatusReceived.forEach((num, list) -> {
                arrayList.addAll(list);
            });
            long sum = (long) arrayList.stream().mapToDouble((v0) -> {
                return v0.getTotalErrorRecords();
            }).sum();
            long sum2 = (long) arrayList.stream().mapToDouble((v0) -> {
                return v0.getTotalRecords();
            }).sum();
            boolean z = sum > 0;
            if (!z || this.configs.allowCommitOnErrors().booleanValue()) {
                if (this.transactionServices.endCommit(this.currentCommitTime, arrayList, transformKafkaOffsets(this.currentConsumedKafkaOffsets))) {
                    LOG.info("Commit " + this.currentCommitTime + " successful!");
                    this.currentState = State.WRITE_STATUS_RCVD;
                    this.globalCommittedKafkaOffsets.putAll(this.currentConsumedKafkaOffsets);
                    submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT, controlMessage.getTopicName(), this.currentCommitTime));
                    return;
                }
                LOG.error("Commit " + this.currentCommitTime + " failed!");
            } else if (z) {
                LOG.error("Coordinator found errors when writing. Errors/Total=" + sum + "/" + sum2);
                LOG.error("Printing out the top 100 errors");
                arrayList.stream().filter((v0) -> {
                    return v0.hasErrors();
                }).limit(100L).forEach(writeStatus -> {
                    LOG.error("Global error :", writeStatus.getGlobalError());
                    if (writeStatus.getErrors().size() > 0) {
                        writeStatus.getErrors().forEach((hoodieKey, th) -> {
                            LOG.trace("Error for key:" + hoodieKey + " is " + th);
                        });
                    }
                });
            }
            this.currentState = State.FAILED_COMMIT;
            LOG.warn("Current commit " + this.currentCommitTime + " failed, so starting a new commit after recovery delay");
            submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), RESTART_COMMIT_DELAY_MS.longValue(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.error("Fatal error while committing file", e);
        }
    }

    private void handleWriteStatusTimeout() {
        if (this.currentState.equals(State.ENDED_COMMIT)) {
            this.currentState = State.WRITE_STATUS_TIMEDOUT;
            LOG.warn("Current commit " + this.currentCommitTime + " failed after a write status timeout, so starting a new commit after recovery delay");
            submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), RESTART_COMMIT_DELAY_MS.longValue(), TimeUnit.MILLISECONDS);
        }
    }

    private void submitAckCommit() {
        try {
            this.kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.ACK_COMMIT));
        } catch (Exception e) {
            LOG.warn(String.format("Could not send ACK_COMMIT message for partition %s and commitTime %s", this.partition, this.currentCommitTime), e);
        }
        this.currentState = State.ACKED_COMMIT;
        submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT, this.partition.topic(), ""), START_COMMIT_INIT_DELAY_MS.longValue(), TimeUnit.MILLISECONDS);
    }

    private void initializeGlobalCommittedKafkaOffsets() {
        try {
            String str = this.transactionServices.fetchLatestExtraCommitMetadata().get(KAFKA_OFFSET_KEY);
            if (!StringUtils.isNullOrEmpty(str)) {
                LOG.info("Retrieved Raw Kafka offsets from Hudi Commit File " + str);
                this.globalCommittedKafkaOffsets = (Map) Arrays.stream(str.split(KAFKA_OFFSET_DELIMITER)).map(str2 -> {
                    return str2.split(KAFKA_OFFSET_KV_DELIMITER);
                }).collect(Collectors.toMap(strArr -> {
                    return Integer.valueOf(Integer.parseInt(strArr[0]));
                }, strArr2 -> {
                    return Long.valueOf(Long.parseLong(strArr2[1]));
                }));
                LOG.info("Initialized the kafka offset commits " + this.globalCommittedKafkaOffsets);
            }
        } catch (Exception e) {
            throw new HoodieException("Could not deserialize the kafka commit offsets", e);
        }
    }

    private Map<String, String> transformKafkaOffsets(Map<Integer, Long> map) {
        try {
            return Collections.singletonMap(KAFKA_OFFSET_KEY, (String) map.keySet().stream().map(num -> {
                return num + KAFKA_OFFSET_KV_DELIMITER + map.get(num);
            }).collect(Collectors.joining(KAFKA_OFFSET_DELIMITER)));
        } catch (Exception e) {
            throw new HoodieException("Could not serialize the kafka commit offsets", e);
        }
    }

    private ControlMessage buildControlMessage(ControlMessage.EventType eventType) {
        return ControlMessage.newBuilder().setProtocolVersion(0).setType(eventType).setTopicName(this.partition.topic()).setSenderType(ControlMessage.EntityType.COORDINATOR).setSenderPartition(this.partition.partition()).setReceiverType(ControlMessage.EntityType.PARTICIPANT).setCommitTime(this.currentCommitTime).setCoordinatorInfo(ControlMessage.CoordinatorInfo.newBuilder().putAllGlobalKafkaCommitOffsets(this.globalCommittedKafkaOffsets).m136build()).m42build();
    }
}
