/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect;

import java.util.Collections;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockKafkaConnect;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.hudi.helper.TestHudiWriterProvider;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TestConnectTransactionParticipant {
    private static final String TOPIC_NAME = "kafka-connect-test-topic";
    private static final int NUM_RECORDS_BATCH = 5;
    private static final int PARTITION_NUMBER = 4;
    private ConnectTransactionParticipant participant;
    private MockCoordinator mockCoordinator;
    private TopicPartition partition;
    private KafkaConnectConfigs configs;
    private KafkaControlAgent kafkaControlAgent;
    private TestHudiWriterProvider testHudiWriterProvider;
    private MockKafkaConnect mockKafkaConnect;

    @BeforeEach
    public void setUp() throws Exception {
        this.partition = new TopicPartition(TOPIC_NAME, 4);
        this.kafkaControlAgent = new MockKafkaControlAgent();
        this.mockKafkaConnect = new MockKafkaConnect(this.partition);
        this.mockCoordinator = new MockCoordinator(this.kafkaControlAgent);
        this.mockCoordinator.start();
        this.configs = KafkaConnectConfigs.newBuilder().build();
        this.initializeParticipant();
    }

    @ParameterizedTest
    @EnumSource(value=CoordinatorFailureTestScenarios.class)
    public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) {
        try {
            Assertions.assertTrue((boolean)this.mockKafkaConnect.isPaused());
            switch (testScenario) {
                case REGULAR_SCENARIO: {
                    break;
                }
                case COORDINATOR_FAILED_AFTER_START_COMMIT: {
                    this.triggerAndProcessStartCommit();
                    this.initializeCoordinator();
                    break;
                }
                case COORDINATOR_FAILED_AFTER_END_COMMIT: {
                    this.triggerAndProcessStartCommit();
                    this.triggerAndProcessEndCommit();
                    this.initializeCoordinator();
                    break;
                }
                default: {
                    throw new HoodieException("Unknown test scenario " + (Object)((Object)testScenario));
                }
            }
            this.testTwoPhaseCommit(0L);
        }
        catch (Exception exception) {
            throw new HoodieException("Unexpected test failure ", (Throwable)exception);
        }
        this.participant.stop();
    }

    @ParameterizedTest
    @EnumSource(value=ParticipantFailureTestScenarios.class)
    public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) {
        try {
            int currentKafkaOffset = 0;
            switch (testScenario) {
                case FAILURE_BEFORE_START_COMMIT: {
                    this.initializeParticipant();
                    break;
                }
                case FAILURE_AFTER_START_COMMIT: {
                    this.triggerAndProcessStartCommit();
                    this.initializeParticipant();
                    this.triggerAndProcessEndCommit();
                    this.triggerAndProcessAckCommit();
                    break;
                }
                case FAILURE_AFTER_END_COMMIT: {
                    this.triggerAndProcessStartCommit();
                    this.triggerAndProcessEndCommit();
                    this.initializeParticipant();
                    this.triggerAndProcessAckCommit();
                    currentKafkaOffset = 5;
                    break;
                }
                default: {
                    throw new HoodieException("Unknown test scenario " + (Object)((Object)testScenario));
                }
            }
            this.testTwoPhaseCommit(currentKafkaOffset);
        }
        catch (Exception exception) {
            throw new HoodieException("Unexpected test failure ", (Throwable)exception);
        }
    }

    private void initializeParticipant() {
        this.testHudiWriterProvider = new TestHudiWriterProvider();
        this.participant = new ConnectTransactionParticipant(this.partition, this.kafkaControlAgent, (SinkTaskContext)this.mockKafkaConnect, (ConnectWriterProvider)this.testHudiWriterProvider);
        this.mockKafkaConnect.setParticipant((TransactionParticipant)this.participant);
        this.participant.start();
    }

    private void initializeCoordinator() {
        this.mockCoordinator = new MockCoordinator(this.kafkaControlAgent);
        this.mockCoordinator.start();
    }

    private void testTwoPhaseCommit(long currentKafkaOffset) {
        this.triggerAndProcessStartCommit();
        this.triggerAndProcessEndCommit();
        this.triggerAndProcessAckCommit();
        Assertions.assertEquals((int)5, (int)this.testHudiWriterProvider.getLatestNumberWrites());
        Assertions.assertEquals((long)(currentKafkaOffset + 5L), (long)this.mockKafkaConnect.getCurrentKafkaOffset());
        Assertions.assertEquals((long)this.participant.getLastKafkaCommittedOffset(), (long)this.mockCoordinator.getCommittedKafkaOffset());
    }

    private void triggerAndProcessStartCommit() {
        this.mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
        this.mockKafkaConnect.publishBatchRecordsToParticipant(5);
        Assertions.assertTrue((boolean)this.mockKafkaConnect.isResumed());
    }

    private void triggerAndProcessEndCommit() {
        this.mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
        this.mockKafkaConnect.publishBatchRecordsToParticipant(0);
        Assertions.assertTrue((boolean)this.mockKafkaConnect.isPaused());
    }

    private void triggerAndProcessAckCommit() {
        this.mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
        this.mockKafkaConnect.publishBatchRecordsToParticipant(0);
        Assertions.assertTrue((boolean)this.mockKafkaConnect.isPaused());
    }

    private static enum ParticipantFailureTestScenarios {
        FAILURE_BEFORE_START_COMMIT,
        FAILURE_AFTER_START_COMMIT,
        FAILURE_AFTER_END_COMMIT;

    }

    private static enum CoordinatorFailureTestScenarios {
        REGULAR_SCENARIO,
        COORDINATOR_FAILED_AFTER_START_COMMIT,
        COORDINATOR_FAILED_AFTER_END_COMMIT;

    }

    private static class MockCoordinator
    implements TransactionCoordinator {
        private static int currentCommitTime = 101;
        private final KafkaControlAgent kafkaControlAgent;
        private final TopicPartition partition;
        private Option<ControlMessage> lastReceivedWriteStatusEvent;
        private long committedKafkaOffset;

        public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
            this.kafkaControlAgent = kafkaControlAgent;
            this.partition = new TopicPartition(TestConnectTransactionParticipant.TOPIC_NAME, 0);
            this.lastReceivedWriteStatusEvent = Option.empty();
            this.committedKafkaOffset = 0L;
        }

        public void sendEventFromCoordinator(ControlMessage.EventType type) {
            try {
                if (type.equals((Object)ControlMessage.EventType.START_COMMIT)) {
                    ++currentCommitTime;
                }
                this.kafkaControlAgent.publishMessage(ControlMessage.newBuilder().setType(type).setTopicName(this.partition.topic()).setSenderType(ControlMessage.EntityType.COORDINATOR).setSenderPartition(this.partition.partition()).setReceiverType(ControlMessage.EntityType.PARTICIPANT).setCommitTime(String.valueOf(currentCommitTime)).setCoordinatorInfo(ControlMessage.CoordinatorInfo.newBuilder().putAllGlobalKafkaCommitOffsets(Collections.singletonMap(4, this.committedKafkaOffset)).build()).build());
            }
            catch (Exception exception) {
                throw new HoodieException("Fatal error sending control event to Participant");
            }
        }

        public Option<ControlMessage> getLastReceivedWriteStatusEvent() {
            return this.lastReceivedWriteStatusEvent;
        }

        public long getCommittedKafkaOffset() {
            return this.committedKafkaOffset;
        }

        public void start() {
            this.kafkaControlAgent.registerTransactionCoordinator((TransactionCoordinator)this);
        }

        public void stop() {
            this.kafkaControlAgent.deregisterTransactionCoordinator((TransactionCoordinator)this);
        }

        public TopicPartition getPartition() {
            return this.partition;
        }

        public void processControlEvent(ControlMessage message) {
            if (message.getType().equals((Object)ControlMessage.EventType.WRITE_STATUS)) {
                this.lastReceivedWriteStatusEvent = Option.of((Object)message);
                Assertions.assertTrue((message.getParticipantInfo().getKafkaOffset() >= this.committedKafkaOffset ? 1 : 0) != 0);
                this.committedKafkaOffset = message.getParticipantInfo().getKafkaOffset();
            }
        }
    }
}

