package org.apache.hudi.utilities.sources.helpers;

import java.util.UUID;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.class */
public class TestKafkaOffsetGen {
    private static String TEST_TOPIC_NAME = "hoodie_test";
    private KafkaTestUtils testUtils;
    private HoodieDeltaStreamerMetrics metrics = (HoodieDeltaStreamerMetrics) Mockito.mock(HoodieDeltaStreamerMetrics.class);

    @BeforeEach
    public void setup() throws Exception {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.testUtils.teardown();
    }

    private TypedProperties getConsumerConfigs(String str, String str2) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put("hoodie.deltastreamer.source.kafka.checkpoint.type", str2);
        typedProperties.put("auto.offset.reset", str);
        typedProperties.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
        typedProperties.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        typedProperties.setProperty("key.deserializer", StringDeserializer.class.getName());
        typedProperties.setProperty("value.deserializer", StringDeserializer.class.getName());
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Test
    public void testGetNextOffsetRangesFromEarliest() {
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(TEST_TOPIC_NAME, 1);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500L, this.metrics);
        Assertions.assertEquals(1, nextOffsetRanges.length);
        Assertions.assertEquals(0L, nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals(500L, nextOffsetRanges[0].untilOffset());
        OffsetRange[] nextOffsetRanges2 = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 5000L, this.metrics);
        Assertions.assertEquals(1, nextOffsetRanges2.length);
        Assertions.assertEquals(0L, nextOffsetRanges2[0].fromOffset());
        Assertions.assertEquals(1000L, nextOffsetRanges2[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromLatest() {
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(TEST_TOPIC_NAME, 1);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        OffsetRange[] nextOffsetRanges = new KafkaOffsetGen(getConsumerConfigs("latest", "string")).getNextOffsetRanges(Option.empty(), 500L, this.metrics);
        Assertions.assertEquals(1, nextOffsetRanges.length);
        Assertions.assertEquals(1000L, nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals(1000L, nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromCheckpoint() {
        String str = TEST_TOPIC_NAME + ",0:250";
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(TEST_TOPIC_NAME, 1);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        OffsetRange[] nextOffsetRanges = new KafkaOffsetGen(getConsumerConfigs("latest", "string")).getNextOffsetRanges(Option.of(str), 500L, this.metrics);
        Assertions.assertEquals(1, nextOffsetRanges.length);
        Assertions.assertEquals(250L, nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals(750L, nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromTimestampCheckpointType() {
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(TEST_TOPIC_NAME, 1);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        OffsetRange[] nextOffsetRanges = new KafkaOffsetGen(getConsumerConfigs("latest", "timestamp")).getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis() - 100000)), 500L, this.metrics);
        Assertions.assertEquals(1, nextOffsetRanges.length);
        Assertions.assertEquals(0L, nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals(500L, nextOffsetRanges[0].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromMultiplePartitions() {
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        OffsetRange[] nextOffsetRanges = new KafkaOffsetGen(getConsumerConfigs("earliest", "string")).getNextOffsetRanges(Option.empty(), 499L, this.metrics);
        Assertions.assertEquals(2, nextOffsetRanges.length);
        Assertions.assertEquals(0L, nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals(250L, nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals(0L, nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals(249L, nextOffsetRanges[1].untilOffset());
    }

    @Test
    public void testGetNextOffsetRangesFromGroup() {
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
        kafkaOffsetGen.commitOffsetToKafka(TEST_TOPIC_NAME + ",0:250,1:249");
        OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals(250L, nextOffsetRanges[0].fromOffset());
        Assertions.assertEquals(400L, nextOffsetRanges[0].untilOffset());
        Assertions.assertEquals(249L, nextOffsetRanges[1].fromOffset());
        Assertions.assertEquals(399L, nextOffsetRanges[1].untilOffset());
        OffsetRange[] nextOffsetRanges2 = new KafkaOffsetGen(getConsumerConfigs("group", "string")).getNextOffsetRanges(Option.empty(), 300L, this.metrics);
        Assertions.assertEquals(500L, nextOffsetRanges2[0].fromOffset());
        Assertions.assertEquals(500L, nextOffsetRanges2[0].untilOffset());
        Assertions.assertEquals(500L, nextOffsetRanges2[1].fromOffset());
        Assertions.assertEquals(500L, nextOffsetRanges2[1].untilOffset());
    }

    @Test
    public void testCheckTopicExists() {
        TypedProperties consumerConfigs = getConsumerConfigs("latest", "string");
        KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(consumerConfigs);
        this.testUtils.createTopic(TEST_TOPIC_NAME, 1);
        Assertions.assertTrue(kafkaOffsetGen.checkTopicExists(new KafkaConsumer(consumerConfigs)));
        consumerConfigs.put("hoodie.deltastreamer.source.kafka.topic", "random");
        Assertions.assertFalse(new KafkaOffsetGen(consumerConfigs).checkTopicExists(new KafkaConsumer(consumerConfigs)));
    }

    @Test
    public void testTopicNameNotPresentInProps() {
        Assertions.assertThrows(HoodieNotSupportedException.class, () -> {
            new KafkaOffsetGen(new TypedProperties());
        });
    }
}
