package io.hops.util.spark;

import io.hops.util.Hops;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import scala.Function0;

/* loaded from: input_file:io/hops/util/spark/SparkConsumer.class */
public class SparkConsumer {
    private JavaStreamingContext jsc;
    private final Collection<String> topics;
    private final Map<String, Object> kafkaParams;
    private SparkSession sparkSession;

    public SparkConsumer() {
        this.topics = Hops.getTopics();
        this.kafkaParams = Hops.getKafkaProperties().getSparkConsumerConfigMap();
    }

    public SparkConsumer(Collection<String> collection) {
        this.topics = collection;
        this.kafkaParams = Hops.getKafkaProperties().getSparkConsumerConfigMap();
    }

    public SparkConsumer(Properties properties) {
        this.topics = Hops.getTopics();
        this.kafkaParams = Hops.getKafkaProperties().getSparkConsumerConfigMap(properties);
    }

    public SparkConsumer(JavaStreamingContext javaStreamingContext, Collection<String> collection) {
        this.jsc = javaStreamingContext;
        this.topics = collection;
        this.kafkaParams = Hops.getKafkaProperties().getSparkConsumerConfigMap();
    }

    public SparkConsumer(JavaStreamingContext javaStreamingContext, Collection<String> collection, Properties properties) {
        this.jsc = javaStreamingContext;
        this.topics = collection;
        this.kafkaParams = Hops.getKafkaProperties().getSparkConsumerConfigMap(properties);
    }

    public SparkConsumer(JavaStreamingContext javaStreamingContext, Collection<String> collection, Map<String, Object> map) {
        this.jsc = javaStreamingContext;
        this.topics = collection;
        this.kafkaParams = map;
    }

    public <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream() {
        return KafkaUtils.createDirectStream(this.jsc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(this.topics, this.kafkaParams));
    }

    public DataStreamReader getKafkaDataStreamReader() {
        return getKafkaDataStreamReader(null);
    }

    public DataStreamReader getKafkaDataStreamReader(Map<String, String> map) {
        this.sparkSession = SparkSession.builder().appName(Hops.getJobName()).getOrCreate();
        return this.sparkSession.readStream().format("kafka").options(Hops.getKafkaProperties().getSparkStructuredStreamingKafkaProps(map));
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    public <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(LocationStrategy locationStrategy) {
        return KafkaUtils.createDirectStream(this.jsc, locationStrategy, ConsumerStrategies.Subscribe(this.topics, this.kafkaParams));
    }

    public <K, V> JavaRDD<ConsumerRecord<K, V>> createRDD(JavaSparkContext javaSparkContext, OffsetRange[] offsetRangeArr, LocationStrategy locationStrategy) {
        return KafkaUtils.createRDD(javaSparkContext, this.kafkaParams, offsetRangeArr, locationStrategy);
    }

    public void initializeLogIfNecessary(boolean z) {
        KafkaUtils.initializeLogIfNecessary(z);
    }

    public boolean isTraceEnabled() {
        return KafkaUtils.isTraceEnabled();
    }

    public Logger log() {
        return KafkaUtils.log();
    }

    public String logName() {
        return KafkaUtils.logName();
    }

    public void logDebug(Function0<String> function0) {
        KafkaUtils.logDebug(function0);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        KafkaUtils.logDebug(function0, th);
    }

    public void logError(Function0<String> function0) {
        KafkaUtils.logError(function0);
    }

    public void logError(Function0<String> function0, Throwable th) {
        KafkaUtils.logError(function0, th);
    }

    public void logInfo(Function0<String> function0) {
        KafkaUtils.logInfo(function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        KafkaUtils.logInfo(function0, th);
    }

    public void logTrace(Function0<String> function0) {
        KafkaUtils.logTrace(function0);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        KafkaUtils.logTrace(function0, th);
    }

    public void logWarning(Function0<String> function0) {
        KafkaUtils.logWarning(function0);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        KafkaUtils.logWarning(function0, th);
    }
}
