package org.apache.hudi.connect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockConnectTransactionServices;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionCoordinator.class */
public class TestConnectTransactionCoordinator {
    private static final String TOPIC_NAME = "kafka-connect-test-topic";
    private static final int TOTAL_KAFKA_PARTITIONS = 4;
    private static final int MAX_COMMIT_ROUNDS = 5;
    private static final int TEST_TIMEOUT_SECS = 60;
    private KafkaConnectConfigs configs;
    private MockParticipant participant;
    private MockKafkaControlAgent kafkaControlAgent;
    private MockConnectTransactionServices transactionServices;
    private CountDownLatch latch;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.connect.TestConnectTransactionCoordinator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionCoordinator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios;
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$connect$ControlMessage$EventType = new int[ControlMessage.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$connect$ControlMessage$EventType[ControlMessage.EventType.START_COMMIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$ControlMessage$EventType[ControlMessage.EventType.END_COMMIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$ControlMessage$EventType[ControlMessage.EventType.ACK_COMMIT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios = new int[MockParticipant.TestScenarios.values().length];
            try {
                $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios[MockParticipant.TestScenarios.ALL_CONNECT_TASKS_SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios[MockParticipant.TestScenarios.ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios[MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios[MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED.ordinal()] = TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios[MockParticipant.TestScenarios.SUBSET_CONNECT_TASKS_FAILED.ordinal()] = TestConnectTransactionCoordinator.MAX_COMMIT_ROUNDS;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionCoordinator$MockParticipant.class */
    private static class MockParticipant implements TransactionParticipant {
        private final MockKafkaControlAgent kafkaControlAgent;
        private final CountDownLatch latch;
        private final TestScenarios testScenario;
        private final int maxNumberCommitRounds;
        private final TopicPartition partition = new TopicPartition(TestConnectTransactionCoordinator.TOPIC_NAME, 3);
        private final Map<Integer, Long> kafkaOffsetsCommitted = new HashMap();
        private ControlMessage.EventType expectedMsgType = ControlMessage.EventType.START_COMMIT;
        private int numberCommitRounds = 0;

        /* loaded from: input_file:org/apache/hudi/connect/TestConnectTransactionCoordinator$MockParticipant$TestScenarios.class */
        public enum TestScenarios {
            SUBSET_CONNECT_TASKS_FAILED,
            SUBSET_WRITE_STATUS_FAILED,
            SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
            ALL_CONNECT_TASKS_SUCCESS,
            ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS
        }

        public MockParticipant(MockKafkaControlAgent mockKafkaControlAgent, CountDownLatch countDownLatch, TestScenarios testScenarios, int i) {
            this.kafkaControlAgent = mockKafkaControlAgent;
            this.latch = countDownLatch;
            this.testScenario = testScenarios;
            this.maxNumberCommitRounds = i;
        }

        public void start() {
            this.kafkaControlAgent.registerTransactionParticipant(this);
        }

        public void stop() {
            this.kafkaControlAgent.deregisterTransactionParticipant(this);
        }

        public void buffer(SinkRecord sinkRecord) {
        }

        public void processRecords() {
        }

        public TopicPartition getPartition() {
            return this.partition;
        }

        public void processControlEvent(ControlMessage controlMessage) {
            Assertions.assertEquals(controlMessage.getSenderType(), ControlMessage.EntityType.COORDINATOR);
            Assertions.assertEquals(controlMessage.getTopicName(), this.partition.topic());
            testScenarios(controlMessage);
        }

        public long getLastKafkaCommittedOffset() {
            return 0L;
        }

        private void testScenarios(ControlMessage controlMessage) {
            int i;
            Assertions.assertEquals(this.expectedMsgType, controlMessage.getType());
            switch (AnonymousClass1.$SwitchMap$org$apache$hudi$connect$ControlMessage$EventType[controlMessage.getType().ordinal()]) {
                case 1:
                    this.expectedMsgType = ControlMessage.EventType.END_COMMIT;
                    break;
                case 2:
                    Assertions.assertEquals(this.kafkaOffsetsCommitted, controlMessage.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
                    HashMap hashMap = new HashMap();
                    ArrayList arrayList = new ArrayList();
                    switch (AnonymousClass1.$SwitchMap$org$apache$hudi$connect$TestConnectTransactionCoordinator$MockParticipant$TestScenarios[this.testScenario.ordinal()]) {
                        case 1:
                            composeControlEvent(controlMessage.getCommitTime(), false, false, hashMap, arrayList);
                            i = TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS;
                            this.kafkaOffsetsCommitted.putAll(hashMap);
                            this.expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
                            break;
                        case 2:
                            composeControlEvent(controlMessage.getCommitTime(), false, true, hashMap, arrayList);
                            i = TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS;
                            this.kafkaOffsetsCommitted.putAll(hashMap);
                            this.expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
                            break;
                        case 3:
                            composeControlEvent(controlMessage.getCommitTime(), true, false, hashMap, arrayList);
                            i = TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS;
                            this.kafkaOffsetsCommitted.putAll(hashMap);
                            this.expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
                            break;
                        case TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS /* 4 */:
                            composeControlEvent(controlMessage.getCommitTime(), true, false, hashMap, arrayList);
                            i = TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS;
                            this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
                            break;
                        case TestConnectTransactionCoordinator.MAX_COMMIT_ROUNDS /* 5 */:
                            composeControlEvent(controlMessage.getCommitTime(), false, false, hashMap, arrayList);
                            i = 2;
                            this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
                            break;
                        default:
                            throw new HoodieException("Unknown test scenario " + this.testScenario);
                    }
                    for (int i2 = 0; i2 < i; i2++) {
                        this.kafkaControlAgent.publishMessage((ControlMessage) arrayList.get(i2));
                    }
                    break;
                case 3:
                    if (this.numberCommitRounds >= this.maxNumberCommitRounds) {
                        this.latch.countDown();
                    }
                    this.expectedMsgType = ControlMessage.EventType.START_COMMIT;
                    break;
                default:
                    throw new HoodieException("Illegal control message type " + controlMessage.getType());
            }
            if (controlMessage.getType().equals(ControlMessage.EventType.START_COMMIT)) {
                if (this.numberCommitRounds >= this.maxNumberCommitRounds) {
                    this.latch.countDown();
                }
                this.numberCommitRounds++;
                this.expectedMsgType = ControlMessage.EventType.END_COMMIT;
            }
        }

        private static void composeControlEvent(String str, boolean z, boolean z2, Map<Integer, Long> map, List<ControlMessage> list) {
            for (int i = 1; i <= TestConnectTransactionCoordinator.TOTAL_KAFKA_PARTITIONS; i++) {
                try {
                    long random = (long) (Math.random() * 10000.0d);
                    map.put(Integer.valueOf(i), Long.valueOf(random));
                    list.add(composeWriteStatusResponse(str, new TopicPartition(TestConnectTransactionCoordinator.TOPIC_NAME, i), random, z, z2));
                } catch (Exception e) {
                    throw new HoodieException("Fatal error sending control event to Coordinator");
                }
            }
        }

        private static ControlMessage composeWriteStatusResponse(String str, TopicPartition topicPartition, long j, boolean z, boolean z2) throws Exception {
            List singletonList;
            if (z2) {
                singletonList = Collections.emptyList();
            } else {
                singletonList = Collections.singletonList(z ? TestConnectTransactionCoordinator.access$000() : TestConnectTransactionCoordinator.access$100());
            }
            return ControlMessage.newBuilder().setType(ControlMessage.EventType.WRITE_STATUS).setTopicName(topicPartition.topic()).setSenderType(ControlMessage.EntityType.PARTICIPANT).setSenderPartition(topicPartition.partition()).setReceiverType(ControlMessage.EntityType.COORDINATOR).setReceiverPartition(0).setCommitTime(str).setParticipantInfo(ControlMessage.ParticipantInfo.newBuilder().setWriteStatus(KafkaConnectUtils.buildWriteStatuses(singletonList)).setKafkaOffset(j).build()).build();
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.transactionServices = new MockConnectTransactionServices();
        this.latch = new CountDownLatch(1);
    }

    @EnumSource(MockParticipant.TestScenarios.class)
    @ParameterizedTest
    public void testSingleCommitScenario(MockParticipant.TestScenarios testScenarios) throws InterruptedException {
        this.kafkaControlAgent = new MockKafkaControlAgent();
        this.participant = new MockParticipant(this.kafkaControlAgent, this.latch, testScenarios, MAX_COMMIT_ROUNDS);
        this.participant.start();
        KafkaConnectConfigs.Builder withCoordinatorWriteTimeoutSecs = KafkaConnectConfigs.newBuilder().withCommitIntervalSecs(1L).withCoordinatorWriteTimeoutSecs(1L);
        if (testScenarios.equals(MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED)) {
            withCoordinatorWriteTimeoutSecs.withAllowCommitOnErrors(false);
        }
        this.configs = withCoordinatorWriteTimeoutSecs.build();
        ConnectTransactionCoordinator connectTransactionCoordinator = new ConnectTransactionCoordinator(this.configs, new TopicPartition(TOPIC_NAME, 0), this.kafkaControlAgent, this.transactionServices, (str, str2) -> {
            return TOTAL_KAFKA_PARTITIONS;
        });
        connectTransactionCoordinator.start();
        this.latch.await(60L, TimeUnit.SECONDS);
        if (this.latch.getCount() > 0) {
            throw new HoodieException("Test timedout resulting in failure");
        }
        connectTransactionCoordinator.stop();
        this.participant.stop();
    }

    private static WriteStatus getAllSuccessfulRecordsWriteStatus() {
        WriteStatus writeStatus = new WriteStatus(false, Double.valueOf(0.0d));
        for (int i = 0; i < 1000; i++) {
            writeStatus.markSuccess((HoodieRecord) Mockito.mock(HoodieRecord.class), Option.empty());
        }
        return writeStatus;
    }

    private static WriteStatus getSubsetFailedRecordsWriteStatus() {
        WriteStatus writeStatus = new WriteStatus(false, Double.valueOf(0.0d));
        for (int i = 0; i < 1000; i++) {
            if (i % 10 == 0) {
                writeStatus.markFailure((HoodieRecord) Mockito.mock(HoodieRecord.class), new Throwable("Error writing record on disk"), Option.empty());
            } else {
                writeStatus.markSuccess((HoodieRecord) Mockito.mock(HoodieRecord.class), Option.empty());
            }
        }
        writeStatus.setGlobalError(new Throwable("More than one records failed to be written to storage"));
        return writeStatus;
    }

    static /* synthetic */ WriteStatus access$000() {
        return getSubsetFailedRecordsWriteStatus();
    }

    static /* synthetic */ WriteStatus access$100() {
        return getAllSuccessfulRecordsWriteStatus();
    }
}
