package org.apache.hudi.connect;

import java.util.Collections;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockKafkaConnect;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.hudi.helper.TestHudiWriterProvider;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionParticipant.class */
public class TestConnectTransactionParticipant {
    private static final String TOPIC_NAME = "kafka-connect-test-topic";
    private static final int NUM_RECORDS_BATCH = 5;
    private static final int PARTITION_NUMBER = 4;
    private ConnectTransactionParticipant participant;
    private MockCoordinator mockCoordinator;
    private TopicPartition partition;
    private KafkaConnectConfigs configs;
    private KafkaControlAgent kafkaControlAgent;
    private TestHudiWriterProvider testHudiWriterProvider;
    private MockKafkaConnect mockKafkaConnect;

    /* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionParticipant$CoordinatorFailureTestScenarios.class */
    private enum CoordinatorFailureTestScenarios {
        REGULAR_SCENARIO,
        COORDINATOR_FAILED_AFTER_START_COMMIT,
        COORDINATOR_FAILED_AFTER_END_COMMIT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionParticipant$MockCoordinator.class */
    public static class MockCoordinator implements TransactionCoordinator {
        private static int currentCommitTime = 101;
        private final KafkaControlAgent kafkaControlAgent;
        private final TopicPartition partition = new TopicPartition(TestConnectTransactionParticipant.TOPIC_NAME, 0);
        private Option<ControlMessage> lastReceivedWriteStatusEvent = Option.empty();
        private long committedKafkaOffset = 0;

        public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
            this.kafkaControlAgent = kafkaControlAgent;
        }

        public void sendEventFromCoordinator(ControlMessage.EventType eventType) {
            try {
                if (eventType.equals(ControlMessage.EventType.START_COMMIT)) {
                    currentCommitTime++;
                }
                this.kafkaControlAgent.publishMessage(ControlMessage.newBuilder().setType(eventType).setTopicName(this.partition.topic()).setSenderType(ControlMessage.EntityType.COORDINATOR).setSenderPartition(this.partition.partition()).setReceiverType(ControlMessage.EntityType.PARTICIPANT).setCommitTime(String.valueOf(currentCommitTime)).setCoordinatorInfo(ControlMessage.CoordinatorInfo.newBuilder().putAllGlobalKafkaCommitOffsets(Collections.singletonMap(Integer.valueOf(TestConnectTransactionParticipant.PARTITION_NUMBER), Long.valueOf(this.committedKafkaOffset))).build()).build());
            } catch (Exception e) {
                throw new HoodieException("Fatal error sending control event to Participant");
            }
        }

        public Option<ControlMessage> getLastReceivedWriteStatusEvent() {
            return this.lastReceivedWriteStatusEvent;
        }

        public long getCommittedKafkaOffset() {
            return this.committedKafkaOffset;
        }

        public void start() {
            this.kafkaControlAgent.registerTransactionCoordinator(this);
        }

        public void stop() {
            this.kafkaControlAgent.deregisterTransactionCoordinator(this);
        }

        public TopicPartition getPartition() {
            return this.partition;
        }

        public void processControlEvent(ControlMessage controlMessage) {
            if (controlMessage.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
                this.lastReceivedWriteStatusEvent = Option.of(controlMessage);
                Assertions.assertTrue(controlMessage.getParticipantInfo().getKafkaOffset() >= this.committedKafkaOffset);
                this.committedKafkaOffset = controlMessage.getParticipantInfo().getKafkaOffset();
            }
        }
    }

    /* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionParticipant$ParticipantFailureTestScenarios.class */
    private enum ParticipantFailureTestScenarios {
        FAILURE_BEFORE_START_COMMIT,
        FAILURE_AFTER_START_COMMIT,
        FAILURE_AFTER_END_COMMIT
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
        this.kafkaControlAgent = new MockKafkaControlAgent();
        this.mockKafkaConnect = new MockKafkaConnect(this.partition);
        this.mockCoordinator = new MockCoordinator(this.kafkaControlAgent);
        this.mockCoordinator.start();
        this.configs = KafkaConnectConfigs.newBuilder().build();
        initializeParticipant();
    }

    @EnumSource(CoordinatorFailureTestScenarios.class)
    @ParameterizedTest
    public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios coordinatorFailureTestScenarios) {
        try {
            Assertions.assertTrue(this.mockKafkaConnect.isPaused());
            switch (coordinatorFailureTestScenarios) {
                case REGULAR_SCENARIO:
                    break;
                case COORDINATOR_FAILED_AFTER_START_COMMIT:
                    triggerAndProcessStartCommit();
                    initializeCoordinator();
                    break;
                case COORDINATOR_FAILED_AFTER_END_COMMIT:
                    triggerAndProcessStartCommit();
                    triggerAndProcessEndCommit();
                    initializeCoordinator();
                    break;
                default:
                    throw new HoodieException("Unknown test scenario " + coordinatorFailureTestScenarios);
            }
            testTwoPhaseCommit(0L);
            this.participant.stop();
        } catch (Exception e) {
            throw new HoodieException("Unexpected test failure ", e);
        }
    }

    @EnumSource(ParticipantFailureTestScenarios.class)
    @ParameterizedTest
    public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios participantFailureTestScenarios) {
        try {
            int i = 0;
            switch (participantFailureTestScenarios) {
                case FAILURE_BEFORE_START_COMMIT:
                    initializeParticipant();
                    break;
                case FAILURE_AFTER_START_COMMIT:
                    triggerAndProcessStartCommit();
                    initializeParticipant();
                    triggerAndProcessEndCommit();
                    triggerAndProcessAckCommit();
                    break;
                case FAILURE_AFTER_END_COMMIT:
                    triggerAndProcessStartCommit();
                    triggerAndProcessEndCommit();
                    initializeParticipant();
                    triggerAndProcessAckCommit();
                    i = NUM_RECORDS_BATCH;
                    break;
                default:
                    throw new HoodieException("Unknown test scenario " + participantFailureTestScenarios);
            }
            testTwoPhaseCommit(i);
        } catch (Exception e) {
            throw new HoodieException("Unexpected test failure ", e);
        }
    }

    private void initializeParticipant() {
        this.testHudiWriterProvider = new TestHudiWriterProvider();
        this.participant = new ConnectTransactionParticipant(this.partition, this.kafkaControlAgent, this.mockKafkaConnect, this.testHudiWriterProvider);
        this.mockKafkaConnect.setParticipant(this.participant);
        this.participant.start();
    }

    private void initializeCoordinator() {
        this.mockCoordinator = new MockCoordinator(this.kafkaControlAgent);
        this.mockCoordinator.start();
    }

    private void testTwoPhaseCommit(long j) {
        triggerAndProcessStartCommit();
        triggerAndProcessEndCommit();
        triggerAndProcessAckCommit();
        Assertions.assertEquals(NUM_RECORDS_BATCH, this.testHudiWriterProvider.getLatestNumberWrites());
        Assertions.assertEquals(j + 5, this.mockKafkaConnect.getCurrentKafkaOffset());
        Assertions.assertEquals(this.participant.getLastKafkaCommittedOffset(), this.mockCoordinator.getCommittedKafkaOffset());
    }

    private void triggerAndProcessStartCommit() {
        this.mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
        this.mockKafkaConnect.publishBatchRecordsToParticipant(NUM_RECORDS_BATCH);
        Assertions.assertTrue(this.mockKafkaConnect.isResumed());
    }

    private void triggerAndProcessEndCommit() {
        this.mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
        this.mockKafkaConnect.publishBatchRecordsToParticipant(0);
        Assertions.assertTrue(this.mockKafkaConnect.isPaused());
    }

    private void triggerAndProcessAckCommit() {
        this.mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
        this.mockKafkaConnect.publishBatchRecordsToParticipant(0);
        Assertions.assertTrue(this.mockKafkaConnect.isPaused());
    }
}
