/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.hops.hudi.com.google.protobuf.Message;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Properties;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.KafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;

public class ProtoKafkaSource
extends KafkaSource<JavaRDD<Message>> {
    private final Option<String> className;
    private final String deserializerName;

    public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
        this(props, sparkContext, sparkSession, metrics, (StreamContext)new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public ProtoKafkaSource(TypedProperties properties2, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
        super(properties2, sparkContext, sparkSession, Source.SourceType.PROTO, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), properties2, sparkContext), streamContext.getSourceProfileSupplier()));
        this.deserializerName = ConfigUtils.getStringWithAltKeys((Properties)this.props, KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true);
        if (!this.deserializerName.equals(ByteArrayDeserializer.class.getName()) && !this.deserializerName.equals(KafkaProtobufDeserializer.class.getName())) {
            throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and KafkaProtobufDeserializer are supported for ProtoKafkaSource");
        }
        if (this.deserializerName.equals(ByteArrayDeserializer.class.getName())) {
            ConfigUtils.checkRequiredConfigProperties(this.props, Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
            this.className = Option.of(ConfigUtils.getStringWithAltKeys(this.props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
        } else {
            this.className = Option.empty();
        }
        this.props.put("key.deserializer", StringDeserializer.class.getName());
        this.props.put("value.deserializer", this.deserializerName);
        this.offsetGen = new KafkaOffsetGen(this.props);
        if (this.shouldAddOffsets) {
            throw new HoodieReadFromSourceException("Appending kafka offsets to ProtoKafkaSource is not supported");
        }
    }

    @Override
    protected JavaRDD<Message> toBatch(OffsetRange[] offsetRanges) {
        if (this.deserializerName.equals(ByteArrayDeserializer.class.getName())) {
            ValidationUtils.checkArgument(this.className.isPresent(), ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + " config must be present.");
            ProtoDeserializer deserializer = new ProtoDeserializer(this.className.get());
            return KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, this.offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map((Function & Serializable)obj -> deserializer.parse((byte[])obj.value()));
        }
        return KafkaUtils.createRDD((JavaSparkContext)this.sparkContext, this.offsetGen.getKafkaParams(), (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map(ConsumerRecord::value);
    }

    @Override
    protected boolean allowSourcePersist() {
        return this.persistRdd && this.deserializerName.equals(ByteArrayDeserializer.class.getName());
    }

    private static class ProtoDeserializer
    implements Serializable {
        private final String className;
        private transient Class protoClass;
        private transient Method parseMethod;

        public ProtoDeserializer(String className) {
            this.className = className;
        }

        public Message parse(byte[] bytes) {
            try {
                return (Message)this.getParseMethod().invoke(this.getClass(), new Object[]{bytes});
            }
            catch (IllegalAccessException | InvocationTargetException ex) {
                throw new HoodieReadFromSourceException("Failed to parse proto message from kafka", ex);
            }
        }

        private Class getProtoClass() {
            if (this.protoClass == null) {
                this.protoClass = ReflectionUtils.getClass(this.className);
            }
            return this.protoClass;
        }

        private Method getParseMethod() {
            if (this.parseMethod == null) {
                try {
                    this.parseMethod = this.getProtoClass().getMethod("parseFrom", byte[].class);
                }
                catch (NoSuchMethodException ex) {
                    throw new HoodieReadFromSourceException("Unable to get proto parsing method from specified class: " + this.className, ex);
                }
            }
            return this.parseMethod;
        }
    }
}

