/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.Serializable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class AvroKafkaSource
extends AvroSource {
    private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
    private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
    private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema";
    private final KafkaOffsetGen offsetGen;
    private final HoodieDeltaStreamerMetrics metrics;
    private final SchemaProvider schemaProvider;
    private final String deserializerClassName;

    public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
        super(props, sparkContext, sparkSession, schemaProvider);
        props.put((Object)NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
        this.deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), (String)DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
        try {
            props.put((Object)NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(this.deserializerClassName));
            if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
                if (schemaProvider == null) {
                    throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
                }
                props.put((Object)KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, (Object)schemaProvider.getSourceSchema().toString());
            }
        }
        catch (ClassNotFoundException e) {
            String error = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error((Object)error);
            throw new HoodieException(error, (Throwable)e);
        }
        this.schemaProvider = schemaProvider;
        this.metrics = metrics;
        this.offsetGen = new KafkaOffsetGen(props);
    }

    @Override
    protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
        OffsetRange[] offsetRanges = this.offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, this.metrics);
        long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
        LOG.info((Object)("About to read " + totalNewMsgs + " from Kafka for topic :" + this.offsetGen.getTopicName()));
        if (totalNewMsgs <= 0L) {
            return new InputBatch<JavaRDD<GenericRecord>>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
        }
        JavaRDD<GenericRecord> newDataRDD = this.toRDD(offsetRanges);
        return new InputBatch<JavaRDD<GenericRecord>>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
    }

    private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
        if (this.deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
            if (this.schemaProvider == null) {
                throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
            }
            AvroConvertor convertor = new AvroConvertor(this.schemaProvider.getSourceSchema());
            return KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, this.offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> convertor.fromAvroBinary((byte[])obj.value()));
        }
        return KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, this.offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> (GenericRecord)obj.value());
    }

    @Override
    public void onCommit(String lastCkptStr) {
        if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), ((Boolean)KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue()).booleanValue())) {
            this.offsetGen.commitOffsetToKafka(lastCkptStr);
        }
    }
}

