package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestJsonKafkaSource.class */
public class TestJsonKafkaSource extends BaseTestKafkaSource {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("streamer-config/source_short_trip_uber.avsc");

    @BeforeEach
    public void init() throws Exception {
        String path = ((URL) Objects.requireNonNull(SCHEMA_FILE_URL)).toURI().getPath();
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", path);
        this.schemaProvider = new FilebasedSchemaProvider(typedProperties, jsc());
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    TypedProperties createPropsForKafkaSource(String str, Long l, String str2) {
        return createPropsForJsonKafkaSource(testUtils.brokerAddress(), str, l, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TypedProperties createPropsForJsonKafkaSource(String str, String str2, Long l, String str3) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", str2);
        typedProperties.setProperty("bootstrap.servers", str);
        typedProperties.setProperty("auto.offset.reset", str3);
        typedProperties.setProperty("enable.auto.commit", "false");
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    SourceFormatAdapter createSource(TypedProperties typedProperties) {
        return new SourceFormatAdapter(new JsonKafkaSource(typedProperties, jsc(), spark(), this.schemaProvider, this.metrics));
    }

    @Test
    public void testJsonKafkaSourceFilterNullMsg() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceFilterNullMsg", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceFilterNullMsg", null, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 1000, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", new String[100]);
        Assertions.assertEquals(1000L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", Long.MAX_VALUE, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 1000, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("001", 1000, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        Assertions.assertEquals(1000L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 1500L).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 500L, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 1000, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("001", 1000, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(500L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assertions.assertEquals(400L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 400L).getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals(600L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat3 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat3.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat3.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    void sendMessagesToKafka(String str, int i, int i2) {
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(new HoodieTestDataGenerator().generateInsertsAsPerSchema("000", Integer.valueOf(i), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"), i2));
    }

    void sendNullKafkaKeyMessagesToKafka(String str, int i, int i2) {
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey(new HoodieTestDataGenerator().generateInsertsAsPerSchema("000", Integer.valueOf(i), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"), i2));
    }

    void sendJsonSafeMessagesToKafka(String str, int i, int i2) {
        try {
            Tuple2[] tuple2Arr = new Tuple2[i];
            String[] jsonifyRecords = UtilitiesTestBase.Helpers.jsonifyRecords(new HoodieTestDataGenerator().generateInsertsAsPerSchema("000", Integer.valueOf(i), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"));
            for (int i3 = 0; i3 < i; i3++) {
                Map map = (Map) OBJECT_MAPPER.readValue(jsonifyRecords[i3], Map.class);
                map.remove("height");
                map.remove("current_date");
                map.remove("nation");
                tuple2Arr[i3] = new Tuple2(Integer.toString(i3 % i2), OBJECT_MAPPER.writeValueAsString(map));
            }
            testUtils.sendMessages(str, tuple2Arr);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testErrorEventsForDataInRowFormat() throws IOException {
        testUtils.createTopic("hoodie_test_testErrorEventsForDataInRowFormat", 2);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition("hoodie_test_testErrorEventsForDataInRowFormat", 0));
        arrayList.add(new TopicPartition("hoodie_test_testErrorEventsForDataInRowFormat", 1));
        new HoodieTestDataGenerator();
        sendJsonSafeMessagesToKafka("hoodie_test_testErrorEventsForDataInRowFormat", 1000, 2);
        testUtils.sendMessages("hoodie_test_testErrorEventsForDataInRowFormat", new String[]{"error_event1", "error_event2"});
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testErrorEventsForDataInRowFormat", null, "earliest");
        createPropsForKafkaSource.put(KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
        createPropsForKafkaSource.put(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), "/tmp/qurantine_table_test/json_kafka_row_events");
        createPropsForKafkaSource.put(HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), "json_kafka_row_events");
        createPropsForKafkaSource.put("hoodie.errortable.validate.targetschema.enable", "true");
        createPropsForKafkaSource.put("hoodie.base.path", "/tmp/json_kafka_row_events");
        JsonKafkaSource jsonKafkaSource = new JsonKafkaSource(createPropsForKafkaSource, jsc(), spark(), this.schemaProvider, this.metrics);
        Option of = Option.of(getAnonymousErrorTableWriter(createPropsForKafkaSource));
        Assertions.assertEquals(1000L, ((Dataset) new SourceFormatAdapter(jsonKafkaSource, of, Option.of(createPropsForKafkaSource)).fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
        Assertions.assertEquals(2L, ((JavaRDD) ((BaseErrorTableWriter) of.get()).getErrorEvents(HoodieActiveTimeline.createNewInstantTime(), Option.empty()).get()).count());
    }

    @Test
    public void testErrorEventsForDataInAvroFormat() throws IOException {
        testUtils.createTopic("hoodie_test_testErrorEventsForDataInAvroFormat", 2);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition("hoodie_test_testErrorEventsForDataInAvroFormat", 0));
        arrayList.add(new TopicPartition("hoodie_test_testErrorEventsForDataInAvroFormat", 1));
        testUtils.sendMessages("hoodie_test_testErrorEventsForDataInAvroFormat", UtilitiesTestBase.Helpers.jsonifyRecords(new HoodieTestDataGenerator().generateInsertsAsPerSchema("000", 1000, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages("hoodie_test_testErrorEventsForDataInAvroFormat", new String[]{"error_event1", "error_event2"});
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testErrorEventsForDataInAvroFormat", null, "earliest");
        createPropsForKafkaSource.put(KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
        createPropsForKafkaSource.put(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), "/tmp/qurantine_table_test/json_kafka_events");
        createPropsForKafkaSource.put(HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), "json_kafka_events");
        createPropsForKafkaSource.put("hoodie.base.path", "/tmp/json_kafka_events");
        JsonKafkaSource jsonKafkaSource = new JsonKafkaSource(createPropsForKafkaSource, jsc(), spark(), this.schemaProvider, this.metrics);
        Option of = Option.of(getAnonymousErrorTableWriter(createPropsForKafkaSource));
        Assertions.assertEquals(1000L, ((JavaRDD) new SourceFormatAdapter(jsonKafkaSource, of, Option.of(createPropsForKafkaSource)).fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
        Assertions.assertEquals(2L, ((JavaRDD) ((BaseErrorTableWriter) of.get()).getErrorEvents(HoodieActiveTimeline.createNewInstantTime(), Option.empty()).get()).count());
    }

    private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties typedProperties) {
        return new BaseErrorTableWriter<ErrorEvent<String>>(new HoodieDeltaStreamer.Config(), spark(), typedProperties, new HoodieSparkEngineContext(jsc()), fs()) { // from class: org.apache.hudi.utilities.sources.TestJsonKafkaSource.1
            List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();

            public void addErrorEvents(JavaRDD javaRDD) {
                this.errorEvents.add(javaRDD.map(obj -> {
                    return new HoodieAvroRecord(new HoodieKey(), (HoodieRecordPayload) null);
                }));
            }

            public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String str, Option option) {
                return Option.of(this.errorEvents.stream().reduce((javaRDD, javaRDD2) -> {
                    return javaRDD.union(javaRDD2);
                }).get());
            }

            public boolean upsertAndCommit(String str, Option option) {
                return false;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 268545356:
                        if (implMethodName.equals("lambda$addErrorEvents$a7a6d111$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/TestJsonKafkaSource$1") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                            return obj -> {
                                return new HoodieAvroRecord(new HoodieKey(), (HoodieRecordPayload) null);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    @Test
    public void testAppendKafkaOffset() {
        testUtils.createTopic("hoodie_test_testKafkaOffsetAppend", 2);
        sendMessagesToKafka("hoodie_test_testKafkaOffsetAppend", 30, 2);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testKafkaOffsetAppend", null, "earliest");
        Dataset cache = ((Dataset) new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource, jsc(), spark(), this.schemaProvider, this.metrics)).fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).cache();
        Assertions.assertEquals(30, cache.count());
        List list = (List) Arrays.stream(cache.columns()).collect(Collectors.toList());
        createPropsForKafkaSource.put(HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
        Dataset cache2 = ((Dataset) new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource, jsc(), spark(), this.schemaProvider, this.metrics)).fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).cache();
        Assertions.assertEquals(30, cache2.count());
        for (int i = 0; i < 2; i++) {
            Assertions.assertEquals(30 / 2, cache2.filter("_hoodie_kafka_source_partition=" + i).count());
        }
        Assertions.assertEquals(0L, cache2.drop(new String[]{"_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key"}).except(cache).count());
        List list2 = (List) Arrays.stream(cache2.columns()).collect(Collectors.toList());
        Assertions.assertEquals(4, list2.size() - list.size());
        Assertions.assertEquals(Arrays.asList("_hoodie_kafka_source_offset", "_hoodie_kafka_source_partition", "_hoodie_kafka_source_timestamp", "_hoodie_kafka_source_key"), list2.subList(list2.size() - 4, list2.size()));
        sendNullKafkaKeyMessagesToKafka("hoodie_test_testKafkaOffsetAppend", 30, 2);
        Dataset cache3 = ((Dataset) new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource, jsc(), spark(), this.schemaProvider, this.metrics)).fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).cache();
        Assertions.assertEquals(30, cache3.toDF().filter("_hoodie_kafka_source_key is null").count());
        cache.unpersist();
        cache2.unpersist();
        cache3.unpersist();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testFailOnDataLoss() throws Exception {
        super.testFailOnDataLoss();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testCommitOffsetToKafka() {
        super.testCommitOffsetToKafka();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testProtoKafkaSourceInsertRecordsLessSourceLimit() {
        super.testProtoKafkaSourceInsertRecordsLessSourceLimit();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testKafkaSourceResetStrategy() {
        super.testKafkaSourceResetStrategy();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testKafkaSource() {
        super.testKafkaSource();
    }
}
