/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.util.HashMap;
import java.util.UUID;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestKafkaSource
extends UtilitiesTestBase {
    private static String TEST_TOPIC_NAME = "hoodie_test";
    private FilebasedSchemaProvider schemaProvider;
    private KafkaTestUtils testUtils;
    private HoodieDeltaStreamerMetrics metrics = (HoodieDeltaStreamerMetrics)Mockito.mock(HoodieDeltaStreamerMetrics.class);

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass();
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanupClass();
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), this.jsc);
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        this.testUtils.teardown();
    }

    private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.setProperty("auto.offset.reset", resetStrategy);
        props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaOffsetGen.Config.maxEventsFromKafkaSource));
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    @Test
    public void testJsonKafkaSource() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForJsonSource(null, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc, this.sparkSession, (SchemaProvider)this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        Dataset fetch1AsRows = AvroConversionUtils.createDataFrame((RDD)JavaRDD.toRDD((JavaRDD)((JavaRDD)fetch1.getBatch().get())), (String)this.schemaProvider.getSourceSchema().toString(), (SparkSession)jsonSource.getSparkSession());
        Assertions.assertEquals((long)900L, (long)fetch1AsRows.count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("001", Integer.valueOf(1000))));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)1100L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch3.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3.getCheckpointForNextBatch());
        InputBatch fetch3AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((Dataset)fetch3AsRows.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3AsRows.getCheckpointForNextBatch());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4.getBatch());
        InputBatch fetch4AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4AsRows.getBatch());
    }

    @Test
    public void testJsonKafkaSourceResetStrategy() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties earliestProps = this.createPropsForJsonSource(null, "earliest");
        JsonKafkaSource earliestJsonSource = new JsonKafkaSource(earliestProps, this.jsc, this.sparkSession, (SchemaProvider)this.schemaProvider, this.metrics);
        SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter((Source)earliestJsonSource);
        TypedProperties latestProps = this.createPropsForJsonSource(null, "latest");
        JsonKafkaSource latestJsonSource = new JsonKafkaSource(latestProps, this.jsc, this.sparkSession, (SchemaProvider)this.schemaProvider, this.metrics);
        SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter((Source)latestJsonSource);
        InputBatch earFetch0 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch latFetch0 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((Object)earFetch0.getBatch(), (Object)latFetch0.getBatch());
        Assertions.assertEquals((Object)earFetch0.getCheckpointForNextBatch(), (Object)latFetch0.getCheckpointForNextBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch earFetch1 = earliestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch latFetch1 = latestKafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((Object)earFetch1.getCheckpointForNextBatch(), (Object)latFetch1.getCheckpointForNextBatch());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForJsonSource(Long.MAX_VALUE, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc, this.sparkSession, (SchemaProvider)this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 500L;
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)1000L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("001", Integer.valueOf(1000))));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 1500L);
        Assertions.assertEquals((long)1000L, (long)((Dataset)fetch2.getBatch().get()).count());
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 5000000L;
    }

    @Test
    public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForJsonSource(Long.MAX_VALUE, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc, this.sparkSession, (SchemaProvider)this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 500L;
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(400))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300L);
        Assertions.assertEquals((long)300L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("001", Integer.valueOf(600))));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 300L);
        Assertions.assertEquals((long)300L, (long)((Dataset)fetch2.getBatch().get()).count());
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 5000000L;
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForJsonSource(500L, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc, this.sparkSession, (SchemaProvider)this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("001", Integer.valueOf(1000))));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)500L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 400L);
        Assertions.assertEquals((long)400L, (long)((JavaRDD)fetch3.getBatch().get()).count());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals((long)600L, (long)((JavaRDD)fetch4.getBatch().get()).count());
        InputBatch fetch5 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch5.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch5.getCheckpointForNextBatch());
        InputBatch fetch6 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch6.getBatch());
    }

    private static HashMap<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
        HashMap<TopicPartition, Long> map = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < partitions.length; ++i) {
            map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]);
        }
        return map;
    }

    @Test
    public void testComputeOffsetRanges() {
        long totalMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])new OffsetRange[]{OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)0L, (long)100L), OffsetRange.apply((String)TEST_TOPIC_NAME, (int)0, (long)100L, (long)200L)});
        Assertions.assertEquals((long)200L, (long)totalMsgs);
        OffsetRange[] ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{300000L, 350000L}), (long)1000000L);
        Assertions.assertEquals((long)200000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{300000L, 350000L}), (long)10000L);
        Assertions.assertEquals((long)10000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)200000L, (long)ranges[0].fromOffset());
        Assertions.assertEquals((long)205000L, (long)ranges[0].untilOffset());
        Assertions.assertEquals((long)250000L, (long)ranges[1].fromOffset());
        Assertions.assertEquals((long)255000L, (long)ranges[1].untilOffset());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestKafkaSource.makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000L, 350000L, 100000L}), (long)1000000L);
        Assertions.assertEquals((long)300000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((int)3, (int)ranges.length);
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestKafkaSource.makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010L, 350000L, 10000L}), (long)100000L);
        Assertions.assertEquals((long)100000L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)10L, (long)ranges[0].count());
        Assertions.assertEquals((long)89990L, (long)ranges[1].count());
        Assertions.assertEquals((long)10000L, (long)ranges[2].count());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestKafkaSource.makeOffsetMap(new int[]{0, 1}, new long[]{200000L, 250000L}), TestKafkaSource.makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010L, 350000L, 10000L}), (long)1000000L);
        Assertions.assertEquals((long)110010L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)10L, (long)ranges[0].count());
        Assertions.assertEquals((long)100000L, (long)ranges[1].count());
        Assertions.assertEquals((long)10000L, (long)ranges[2].count());
        ranges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(TestKafkaSource.makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{0L, 0L, 0L, 0L, 0L}), TestKafkaSource.makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100L, 1000L, 1000L, 1000L, 1000L}), (long)1001L);
        Assertions.assertEquals((long)1001L, (long)KafkaOffsetGen.CheckpointUtils.totalNewMessages((OffsetRange[])ranges));
        Assertions.assertEquals((long)100L, (long)ranges[0].count());
        Assertions.assertEquals((long)226L, (long)ranges[1].count());
        Assertions.assertEquals((long)226L, (long)ranges[2].count());
        Assertions.assertEquals((long)226L, (long)ranges[3].count());
        Assertions.assertEquals((long)223L, (long)ranges[4].count());
    }
}

