/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.transaction;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.TransactionInfo;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class ConnectTransactionParticipant
implements TransactionParticipant {
    private static final Logger LOG = LogManager.getLogger(ConnectTransactionParticipant.class);
    private final LinkedList<SinkRecord> buffer = new LinkedList();
    private final BlockingQueue<ControlMessage> controlEvents = new LinkedBlockingQueue<ControlMessage>();
    private final TopicPartition partition;
    private final SinkTaskContext context;
    private final KafkaControlAgent kafkaControlAgent;
    private final ConnectWriterProvider<WriteStatus> writerProvider;
    private TransactionInfo<WriteStatus> ongoingTransactionInfo;
    private long committedKafkaOffset;

    public ConnectTransactionParticipant(KafkaConnectConfigs configs, TopicPartition partition, KafkaControlAgent kafkaControlAgent, SinkTaskContext context) throws HoodieException {
        this(partition, kafkaControlAgent, context, new KafkaConnectWriterProvider(configs, partition));
    }

    public ConnectTransactionParticipant(TopicPartition partition, KafkaControlAgent kafkaControlAgent, SinkTaskContext context, ConnectWriterProvider<WriteStatus> writerProvider) throws HoodieException {
        this.partition = partition;
        this.context = context;
        this.writerProvider = writerProvider;
        this.kafkaControlAgent = kafkaControlAgent;
        this.ongoingTransactionInfo = null;
        this.committedKafkaOffset = 0L;
    }

    @Override
    public void start() {
        LOG.info((Object)("Start Hudi Transaction Participant for partition " + this.partition.partition()));
        this.kafkaControlAgent.registerTransactionParticipant(this);
        this.context.pause(new TopicPartition[]{this.partition});
    }

    @Override
    public void stop() {
        this.kafkaControlAgent.deregisterTransactionParticipant(this);
        this.cleanupOngoingTransaction();
    }

    @Override
    public void buffer(SinkRecord record) {
        this.buffer.add(record);
    }

    @Override
    public void processControlEvent(ControlMessage message) {
        this.controlEvents.add(message);
    }

    @Override
    public long getLastKafkaCommittedOffset() {
        return this.committedKafkaOffset;
    }

    @Override
    public TopicPartition getPartition() {
        return this.partition;
    }

    @Override
    public void processRecords() {
        block6: while (!this.controlEvents.isEmpty()) {
            ControlMessage message = (ControlMessage)this.controlEvents.poll();
            switch (message.getType()) {
                case START_COMMIT: {
                    this.handleStartCommit(message);
                    continue block6;
                }
                case END_COMMIT: {
                    this.handleEndCommit(message);
                    continue block6;
                }
                case ACK_COMMIT: {
                    this.handleAckCommit(message);
                    continue block6;
                }
                case WRITE_STATUS: {
                    continue block6;
                }
            }
            throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getType().name());
        }
        this.writeRecords();
    }

    private void handleStartCommit(ControlMessage message) {
        this.cleanupOngoingTransaction();
        this.syncKafkaOffsetWithLeader(message);
        this.context.resume(new TopicPartition[]{this.partition});
        String currentCommitTime = message.getCommitTime();
        LOG.info((Object)("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime));
        try {
            this.ongoingTransactionInfo = new TransactionInfo<WriteStatus>(currentCommitTime, this.writerProvider.getWriter(currentCommitTime));
            this.ongoingTransactionInfo.setExpectedKafkaOffset(this.committedKafkaOffset);
        }
        catch (Exception exception) {
            LOG.warn((Object)"Error received while starting a new transaction", (Throwable)exception);
        }
    }

    private void handleEndCommit(ControlMessage message) {
        if (this.ongoingTransactionInfo == null) {
            LOG.warn((Object)String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
            return;
        }
        if (!this.ongoingTransactionInfo.getCommitTime().equals(message.getCommitTime())) {
            LOG.error((Object)String.format("Fatal error received END_COMMIT with commit time %s while local transaction commit time %s", message.getCommitTime(), this.ongoingTransactionInfo.getCommitTime()));
            this.cleanupOngoingTransaction();
            this.syncKafkaOffsetWithLeader(message);
            return;
        }
        this.context.pause(new TopicPartition[]{this.partition});
        this.ongoingTransactionInfo.commitInitiated();
        try {
            List<WriteStatus> writeStatuses = this.ongoingTransactionInfo.getWriter().close();
            ControlMessage writeStatusEvent = ControlMessage.newBuilder().setProtocolVersion(0).setType(ControlMessage.EventType.WRITE_STATUS).setTopicName(this.partition.topic()).setSenderType(ControlMessage.EntityType.PARTICIPANT).setSenderPartition(this.partition.partition()).setReceiverType(ControlMessage.EntityType.COORDINATOR).setReceiverPartition(0).setCommitTime(this.ongoingTransactionInfo.getCommitTime()).setParticipantInfo(ControlMessage.ParticipantInfo.newBuilder().setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses)).setKafkaOffset(this.ongoingTransactionInfo.getExpectedKafkaOffset()).build()).build();
            this.kafkaControlAgent.publishMessage(writeStatusEvent);
        }
        catch (Exception exception) {
            LOG.error((Object)String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), this.partition.partition()), (Throwable)exception);
            throw new HoodieIOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), this.partition.partition()), new IOException(exception));
        }
    }

    private void handleAckCommit(ControlMessage message) {
        if (this.ongoingTransactionInfo != null && this.committedKafkaOffset < this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
            this.committedKafkaOffset = this.ongoingTransactionInfo.getExpectedKafkaOffset();
        }
        this.syncKafkaOffsetWithLeader(message);
        this.cleanupOngoingTransaction();
    }

    private void writeRecords() {
        if (this.ongoingTransactionInfo != null && !this.ongoingTransactionInfo.isCommitInitiated()) {
            while (!this.buffer.isEmpty()) {
                try {
                    SinkRecord record = this.buffer.peek();
                    if (record != null && record.kafkaOffset() == this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
                        this.ongoingTransactionInfo.getWriter().writeRecord(record);
                        this.ongoingTransactionInfo.setExpectedKafkaOffset(record.kafkaOffset() + 1L);
                    } else if (record != null && record.kafkaOffset() > this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
                        LOG.warn((Object)String.format("Received a kafka record with offset %s above the next expected kafka offset %s for partition %s, hence resetting the kafka offset to %s", record.kafkaOffset(), this.ongoingTransactionInfo.getExpectedKafkaOffset(), this.partition, this.ongoingTransactionInfo.getExpectedKafkaOffset()));
                        this.context.offset(this.partition, this.ongoingTransactionInfo.getExpectedKafkaOffset());
                    } else if (record != null && record.kafkaOffset() < this.ongoingTransactionInfo.getExpectedKafkaOffset()) {
                        LOG.warn((Object)String.format("Received a kafka record with offset %s below the next expected kafka offset %s for partition %s, no action will be taken but this record will be ignored since its already written", record.kafkaOffset(), this.ongoingTransactionInfo.getExpectedKafkaOffset(), this.partition));
                    }
                    this.buffer.poll();
                }
                catch (Exception exception) {
                    LOG.warn((Object)String.format("Error received while writing records for transaction %s in partition %s", this.ongoingTransactionInfo.getCommitTime(), this.partition.partition()), (Throwable)exception);
                }
            }
        }
    }

    private void cleanupOngoingTransaction() {
        if (this.ongoingTransactionInfo != null) {
            try {
                this.ongoingTransactionInfo.getWriter().close();
                this.ongoingTransactionInfo = null;
            }
            catch (HoodieIOException exception) {
                LOG.warn((Object)"Error received while trying to cleanup existing transaction", (Throwable)exception);
            }
        }
    }

    private void syncKafkaOffsetWithLeader(ControlMessage message) {
        if (message.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().containsKey(this.partition.partition())) {
            Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().get(this.partition.partition());
            if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0L) {
                if (coordinatorCommittedKafkaOffset != this.committedKafkaOffset) {
                    LOG.warn((Object)String.format("The coordinator offset for kafka partition %s is %d while the locally committed offset is %d, hence resetting the local committed offset to the coordinator provided one to ensure consistency", this.partition, coordinatorCommittedKafkaOffset, this.committedKafkaOffset));
                }
                this.committedKafkaOffset = coordinatorCommittedKafkaOffset;
                return;
            }
        } else {
            LOG.warn((Object)String.format("The coordinator offset for kafka partition %s is not present while the locally committed offset is %d, hence resetting the local committed offset to 0 to avoid data loss", this.partition, this.committedKafkaOffset));
        }
        this.committedKafkaOffset = 0L;
    }
}

