/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockConnectTransactionServices;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;

public class TestConnectTransactionCoordinator {
    private static final String TOPIC_NAME = "kafka-connect-test-topic";
    private static final int TOTAL_KAFKA_PARTITIONS = 4;
    private static final int MAX_COMMIT_ROUNDS = 5;
    private static final int TEST_TIMEOUT_SECS = 60;
    private KafkaConnectConfigs configs;
    private MockParticipant participant;
    private MockKafkaControlAgent kafkaControlAgent;
    private MockConnectTransactionServices transactionServices;
    private CountDownLatch latch;

    @BeforeEach
    public void setUp() throws Exception {
        this.transactionServices = new MockConnectTransactionServices();
        this.latch = new CountDownLatch(1);
    }

    @ParameterizedTest
    @EnumSource(value=MockParticipant.TestScenarios.class)
    public void testSingleCommitScenario(MockParticipant.TestScenarios scenario) throws InterruptedException {
        this.kafkaControlAgent = new MockKafkaControlAgent();
        this.participant = new MockParticipant(this.kafkaControlAgent, this.latch, scenario, 5);
        this.participant.start();
        KafkaConnectConfigs.Builder configBuilder = KafkaConnectConfigs.newBuilder().withCommitIntervalSecs(Long.valueOf(1L)).withCoordinatorWriteTimeoutSecs(Long.valueOf(1L));
        if (scenario.equals((Object)MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED)) {
            configBuilder.withAllowCommitOnErrors(Boolean.valueOf(false));
        }
        this.configs = configBuilder.build();
        ConnectTransactionCoordinator coordinator = new ConnectTransactionCoordinator(this.configs, new TopicPartition(TOPIC_NAME, 0), (KafkaControlAgent)this.kafkaControlAgent, (ConnectTransactionServices)this.transactionServices, (bootstrapServers, topicName) -> 4);
        coordinator.start();
        this.latch.await(60L, TimeUnit.SECONDS);
        if (this.latch.getCount() > 0L) {
            throw new HoodieException("Test timedout resulting in failure");
        }
        coordinator.stop();
        this.participant.stop();
    }

    private static WriteStatus getAllSuccessfulRecordsWriteStatus() {
        WriteStatus status = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
        for (int i = 0; i < 1000; ++i) {
            status.markSuccess((HoodieRecord)Mockito.mock(HoodieRecord.class), Option.empty());
        }
        return status;
    }

    private static WriteStatus getSubsetFailedRecordsWriteStatus() {
        WriteStatus status = new WriteStatus(Boolean.valueOf(false), Double.valueOf(0.0));
        for (int i = 0; i < 1000; ++i) {
            if (i % 10 == 0) {
                status.markFailure((HoodieRecord)Mockito.mock(HoodieRecord.class), new Throwable("Error writing record on disk"), Option.empty());
                continue;
            }
            status.markSuccess((HoodieRecord)Mockito.mock(HoodieRecord.class), Option.empty());
        }
        status.setGlobalError(new Throwable("More than one records failed to be written to storage"));
        return status;
    }

    private static class MockParticipant
    implements TransactionParticipant {
        private final MockKafkaControlAgent kafkaControlAgent;
        private final TopicPartition partition;
        private final CountDownLatch latch;
        private final TestScenarios testScenario;
        private final int maxNumberCommitRounds;
        private final Map<Integer, Long> kafkaOffsetsCommitted;
        private ControlMessage.EventType expectedMsgType;
        private int numberCommitRounds;

        public MockParticipant(MockKafkaControlAgent kafkaControlAgent, CountDownLatch latch, TestScenarios testScenario, int maxNumberCommitRounds) {
            this.kafkaControlAgent = kafkaControlAgent;
            this.latch = latch;
            this.testScenario = testScenario;
            this.maxNumberCommitRounds = maxNumberCommitRounds;
            this.partition = new TopicPartition(TestConnectTransactionCoordinator.TOPIC_NAME, 3);
            this.kafkaOffsetsCommitted = new HashMap<Integer, Long>();
            this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
            this.numberCommitRounds = 0;
        }

        public void start() {
            this.kafkaControlAgent.registerTransactionParticipant(this);
        }

        public void stop() {
            this.kafkaControlAgent.deregisterTransactionParticipant(this);
        }

        public void buffer(SinkRecord record) {
        }

        public void processRecords() {
        }

        public TopicPartition getPartition() {
            return this.partition;
        }

        public void processControlEvent(ControlMessage message) {
            Assertions.assertEquals((Object)message.getSenderType(), (Object)ControlMessage.EntityType.COORDINATOR);
            Assertions.assertEquals((Object)message.getTopicName(), (Object)this.partition.topic());
            this.testScenarios(message);
        }

        public long getLastKafkaCommittedOffset() {
            return 0L;
        }

        private void testScenarios(ControlMessage message) {
            Assertions.assertEquals((Object)this.expectedMsgType, (Object)message.getType());
            switch (message.getType()) {
                case START_COMMIT: {
                    this.expectedMsgType = ControlMessage.EventType.END_COMMIT;
                    break;
                }
                case END_COMMIT: {
                    int numPartitionsThatReportWriteStatus;
                    Assertions.assertEquals(this.kafkaOffsetsCommitted, (Object)message.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
                    HashMap<Integer, Long> kafkaOffsets = new HashMap<Integer, Long>();
                    ArrayList<ControlMessage> controlEvents = new ArrayList<ControlMessage>();
                    switch (this.testScenario) {
                        case ALL_CONNECT_TASKS_SUCCESS: {
                            MockParticipant.composeControlEvent(message.getCommitTime(), false, false, kafkaOffsets, controlEvents);
                            numPartitionsThatReportWriteStatus = 4;
                            this.kafkaOffsetsCommitted.putAll(kafkaOffsets);
                            this.expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
                            break;
                        }
                        case ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS: {
                            MockParticipant.composeControlEvent(message.getCommitTime(), false, true, kafkaOffsets, controlEvents);
                            numPartitionsThatReportWriteStatus = 4;
                            this.kafkaOffsetsCommitted.putAll(kafkaOffsets);
                            this.expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
                            break;
                        }
                        case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED: {
                            MockParticipant.composeControlEvent(message.getCommitTime(), true, false, kafkaOffsets, controlEvents);
                            numPartitionsThatReportWriteStatus = 4;
                            this.kafkaOffsetsCommitted.putAll(kafkaOffsets);
                            this.expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
                            break;
                        }
                        case SUBSET_WRITE_STATUS_FAILED: {
                            MockParticipant.composeControlEvent(message.getCommitTime(), true, false, kafkaOffsets, controlEvents);
                            numPartitionsThatReportWriteStatus = 4;
                            this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
                            break;
                        }
                        case SUBSET_CONNECT_TASKS_FAILED: {
                            MockParticipant.composeControlEvent(message.getCommitTime(), false, false, kafkaOffsets, controlEvents);
                            numPartitionsThatReportWriteStatus = 2;
                            this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
                            break;
                        }
                        default: {
                            throw new HoodieException("Unknown test scenario " + (Object)((Object)this.testScenario));
                        }
                    }
                    for (int i = 0; i < numPartitionsThatReportWriteStatus; ++i) {
                        this.kafkaControlAgent.publishMessage((ControlMessage)controlEvents.get(i));
                    }
                    break;
                }
                case ACK_COMMIT: {
                    if (this.numberCommitRounds >= this.maxNumberCommitRounds) {
                        this.latch.countDown();
                    }
                    this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
                    break;
                }
                default: {
                    throw new HoodieException("Illegal control message type " + message.getType());
                }
            }
            if (message.getType().equals((Object)ControlMessage.EventType.START_COMMIT)) {
                if (this.numberCommitRounds >= this.maxNumberCommitRounds) {
                    this.latch.countDown();
                }
                ++this.numberCommitRounds;
                this.expectedMsgType = ControlMessage.EventType.END_COMMIT;
            }
        }

        private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, boolean useEmptyWriteStatus, Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
            for (int i = 1; i <= 4; ++i) {
                try {
                    long kafkaOffset = (long)(Math.random() * 10000.0);
                    kafkaOffsets.put(i, kafkaOffset);
                    ControlMessage event = MockParticipant.composeWriteStatusResponse(commitTime, new TopicPartition(TestConnectTransactionCoordinator.TOPIC_NAME, i), kafkaOffset, shouldIncludeFailedRecords, useEmptyWriteStatus);
                    controlEvents.add(event);
                    continue;
                }
                catch (Exception exception) {
                    throw new HoodieException("Fatal error sending control event to Coordinator");
                }
            }
        }

        private static ControlMessage composeWriteStatusResponse(String commitTime, TopicPartition partition, long kafkaOffset, boolean includeFailedRecords, boolean useEmptyWriteStatus) throws Exception {
            List writeStatusList = useEmptyWriteStatus ? Collections.emptyList() : Collections.singletonList(includeFailedRecords ? TestConnectTransactionCoordinator.getSubsetFailedRecordsWriteStatus() : TestConnectTransactionCoordinator.getAllSuccessfulRecordsWriteStatus());
            return ControlMessage.newBuilder().setType(ControlMessage.EventType.WRITE_STATUS).setTopicName(partition.topic()).setSenderType(ControlMessage.EntityType.PARTICIPANT).setSenderPartition(partition.partition()).setReceiverType(ControlMessage.EntityType.COORDINATOR).setReceiverPartition(0).setCommitTime(commitTime).setParticipantInfo(ControlMessage.ParticipantInfo.newBuilder().setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatusList)).setKafkaOffset(kafkaOffset).build()).build();
        }

        public static enum TestScenarios {
            SUBSET_CONNECT_TASKS_FAILED,
            SUBSET_WRITE_STATUS_FAILED,
            SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
            ALL_CONNECT_TASKS_SUCCESS,
            ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS;

        }
    }
}

