package com.logicalclocks.hsfs.spark.engine.hudi;

import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource.class */
public class DeltaStreamerKafkaSource extends AvroSource {
    private static final Logger LOG = LogManager.getLogger(DeltaStreamerKafkaSource.class);
    private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
    private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
    private final KafkaOffsetGen offsetGen;
    private final HoodieDeltaStreamerMetrics metrics;

    public DeltaStreamerKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        typedProperties.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
        String string = typedProperties.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), "");
        if (string.isEmpty()) {
            typedProperties.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, DeltaStreamerAvroDeserializer.class);
        } else {
            try {
                if (schemaProvider == null) {
                    throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer");
                }
                typedProperties.put(DataSourceWriteOptions.SCHEMA_PROVIDER_CLASS_PROP(), schemaProvider.getClass().getName());
                typedProperties.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(string));
            } catch (ClassNotFoundException e) {
                String str = "Could not load custom avro kafka deserializer: " + string;
                LOG.error(str);
                throw new HoodieException(str, e);
            }
        }
        this.metrics = new HoodieDeltaStreamerMetrics(HoodieWriteConfig.newBuilder().withProperties(typedProperties).build());
        this.offsetGen = new KafkaOffsetGen(typedProperties);
    }

    protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> option, long j) {
        OffsetRange[] nextOffsetRanges = this.offsetGen.getNextOffsetRanges(option, j, this.metrics);
        long j2 = KafkaOffsetGen.CheckpointUtils.totalNewMessages(nextOffsetRanges);
        LOG.info("About to read " + j2 + " from Kafka for topic: " + this.offsetGen.getTopicName() + " from lastCheckpointStr " + option + " Offset range: " + KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges));
        return j2 <= 0 ? new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges)) : new InputBatch<>(Option.of(toRdd(nextOffsetRanges, this.props.getString("subjectId"))), KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges));
    }

    private JavaRDD<GenericRecord> toRdd(OffsetRange[] offsetRangeArr, String str) {
        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).filter(consumerRecord -> {
            return Boolean.valueOf(str.equals(getHeader(consumerRecord.headers(), "subjectId")));
        }).map(consumerRecord2 -> {
            return (GenericRecord) consumerRecord2.value();
        });
    }

    private static String getHeader(Headers headers, String str) {
        Header lastHeader = headers.lastHeader(str);
        if (lastHeader != null) {
            return new String(lastHeader.value(), StandardCharsets.UTF_8);
        }
        return null;
    }

    public void onCommit(String str) {
        if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), ((Boolean) KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue()).booleanValue())) {
            this.offsetGen.commitOffsetToKafka(str);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -164940864:
                if (implMethodName.equals("lambda$toRdd$b5919535$1")) {
                    z = true;
                    break;
                }
                break;
            case -23220530:
                if (implMethodName.equals("lambda$toRdd$31683d06$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    return consumerRecord2 -> {
                        return (GenericRecord) consumerRecord2.value();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/Boolean;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return consumerRecord -> {
                        return Boolean.valueOf(str.equals(getHeader(consumerRecord.headers(), "subjectId")));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
