package org.apache.hudi.utilities.sources;

import java.util.HashMap;
import java.util.UUID;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestKafkaSource.class */
public class TestKafkaSource extends UtilitiesTestBase {
    private static String TEST_TOPIC_NAME = "hoodie_test";
    private FilebasedSchemaProvider schemaProvider;
    private KafkaTestUtils testUtils;
    private HoodieDeltaStreamerMetrics metrics = (HoodieDeltaStreamerMetrics) Mockito.mock(HoodieDeltaStreamerMetrics.class);

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass();
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanupClass();
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), this.jsc);
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        this.testUtils.teardown();
    }

    private TypedProperties createPropsForJsonSource(Long l, String str) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
        typedProperties.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        typedProperties.setProperty("auto.offset.reset", str);
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaOffsetGen.Config.maxEventsFromKafkaSource));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Test
    public void testJsonKafkaSource() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        JsonKafkaSource jsonKafkaSource = new JsonKafkaSource(createPropsForJsonSource(null, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics);
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(jsonKafkaSource);
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Assertions.assertEquals(900L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()), this.schemaProvider.getSourceSchema().toString(), jsonKafkaSource.getSparkSession()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(1100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        InputBatch fetchNewDataInRowFormat2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testJsonKafkaSourceResetStrategy() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(null, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        SourceFormatAdapter sourceFormatAdapter2 = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(null, "latest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(fetchNewDataInAvroFormat.getBatch(), fetchNewDataInAvroFormat2.getBatch());
        Assertions.assertEquals(fetchNewDataInAvroFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        Assertions.assertEquals(sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch(), sourceFormatAdapter2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(Long.MAX_VALUE, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 500L;
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        Assertions.assertEquals(1000L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 1500L).getBatch().get()).count());
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 5000000L;
    }

    @Test
    public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(Long.MAX_VALUE, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 500L;
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 400)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 300L);
        Assertions.assertEquals(300L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 600)));
        Assertions.assertEquals(300L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 300L).getBatch().get()).count());
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 5000000L;
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(500L, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(500L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assertions.assertEquals(400L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 400L).getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals(600L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat3 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat3.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat3.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    private static HashMap<TopicPartition, Long> makeOffsetMap(int[] iArr, long[] jArr) {
        HashMap<TopicPartition, Long> hashMap = new HashMap<>();
        for (int i = 0; i < iArr.length; i++) {
            hashMap.put(new TopicPartition(TEST_TOPIC_NAME, iArr[i]), Long.valueOf(jArr[i]));
        }
        return hashMap;
    }

    @Test
    public void testComputeOffsetRanges() {
        Assertions.assertEquals(200L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(new OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0L, 100L), OffsetRange.apply(TEST_TOPIC_NAME, 0, 100L, 200L)}));
        Assertions.assertEquals(200000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L)));
        OffsetRange[] computeOffsetRanges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000L);
        Assertions.assertEquals(10000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges));
        Assertions.assertEquals(200000L, computeOffsetRanges[0].fromOffset());
        Assertions.assertEquals(205000L, computeOffsetRanges[0].untilOffset());
        Assertions.assertEquals(250000L, computeOffsetRanges[1].fromOffset());
        Assertions.assertEquals(255000L, computeOffsetRanges[1].untilOffset());
        OffsetRange[] computeOffsetRanges2 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}), 1000000L);
        Assertions.assertEquals(300000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges2));
        Assertions.assertEquals(3, computeOffsetRanges2.length);
        OffsetRange[] computeOffsetRanges3 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 100000L);
        Assertions.assertEquals(100000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges3));
        Assertions.assertEquals(10L, computeOffsetRanges3[0].count());
        Assertions.assertEquals(89990L, computeOffsetRanges3[1].count());
        Assertions.assertEquals(10000L, computeOffsetRanges3[2].count());
        OffsetRange[] computeOffsetRanges4 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 1000000L);
        Assertions.assertEquals(110010L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges4));
        Assertions.assertEquals(10L, computeOffsetRanges4[0].count());
        Assertions.assertEquals(100000L, computeOffsetRanges4[1].count());
        Assertions.assertEquals(10000L, computeOffsetRanges4[2].count());
        OffsetRange[] computeOffsetRanges5 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{0, 0, 0, 0, 0}), makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 1000, 1000, 1000}), 1001L);
        Assertions.assertEquals(1001L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges5));
        Assertions.assertEquals(100L, computeOffsetRanges5[0].count());
        Assertions.assertEquals(226L, computeOffsetRanges5[1].count());
        Assertions.assertEquals(226L, computeOffsetRanges5[2].count());
        Assertions.assertEquals(226L, computeOffsetRanges5[3].count());
        Assertions.assertEquals(223L, computeOffsetRanges5[4].count());
    }
}
