/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.amazon.deequ.profiles.ColumnProfilerRunBuilder;
import com.amazon.deequ.profiles.ColumnProfilerRunner;
import com.amazon.deequ.profiles.ColumnProfiles;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.ExternalFeatureGroup;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StreamFeatureGroup;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.constructor.HudiFeatureGroupAlias;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.engine.hudi.HudiEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import com.logicalclocks.hsfs.metadata.Option;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkFiles;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class SparkEngine {
    private static SparkEngine INSTANCE = null;
    private SparkSession sparkSession;
    private FeatureGroupUtils utils = new FeatureGroupUtils();
    private HudiEngine hudiEngine = new HudiEngine();

    public static synchronized SparkEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SparkEngine();
        }
        return INSTANCE;
    }

    private SparkEngine() {
        this.sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();
        this.sparkSession.conf().set("hive.exec.dynamic.partition", "true");
        this.sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
        this.sparkSession.conf().set("spark.sql.hive.convertMetastoreParquet", "false");
    }

    public void validateSparkConfiguration() throws FeatureStoreException {
        String exceptionText = "Spark is misconfigured for communication with Hopsworks, missing or invalid property: ";
        HashMap<String, String> configurationMap = new HashMap<String, String>();
        configurationMap.put("spark.hadoop.hops.ssl.trustore.name", null);
        configurationMap.put("spark.hadoop.hops.rpc.socket.factory.class.default", "io.hops.hadoop.shaded.org.apache.hadoop.net.HopsSSLSocketFactory");
        configurationMap.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        configurationMap.put("spark.hadoop.hops.ssl.hostname.verifier", "ALLOW_ALL");
        configurationMap.put("spark.hadoop.hops.ssl.keystore.name", null);
        configurationMap.put("spark.hadoop.fs.hopsfs.impl", "io.hops.hopsfs.client.HopsFileSystem");
        configurationMap.put("spark.hadoop.hops.ssl.keystores.passwd.name", null);
        configurationMap.put("spark.hadoop.hops.ipc.server.ssl.enabled", "true");
        configurationMap.put("spark.sql.hive.metastore.jars", null);
        configurationMap.put("spark.hadoop.client.rpc.ssl.enabled.protocol", "TLSv1.2");
        configurationMap.put("spark.hadoop.hive.metastore.uris", null);
        for (Map.Entry entry : configurationMap.entrySet()) {
            if (this.sparkSession.conf().contains((String)entry.getKey()) && (entry.getValue() == null || this.sparkSession.conf().get((String)entry.getKey(), null).equals(entry.getValue()))) continue;
            throw new FeatureStoreException(exceptionText + (String)entry.getKey());
        }
    }

    public String getTrustStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.trustore.name");
    }

    public String getKeyStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystore.name");
    }

    public String getCertKey() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystores.passwd.name");
    }

    public Dataset<Row> sql(String query) {
        try {
            return this.sparkSession.sql(query);
        }
        catch (Exception e) {
            Pattern pattern;
            Matcher matcher;
            if (e.getMessage().contains("Permission denied") && (matcher = (pattern = Pattern.compile("inode=\"/apps/hive/warehouse/(.*)?_featurestore\\.db\"")).matcher(e.getMessage())).find()) {
                String featureStore = matcher.group(1);
                throw new RuntimeException(String.format("Cannot access feature store '%s'. It is possible to request access from data owners of '%s'.", featureStore, featureStore));
            }
            throw e;
        }
    }

    public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemandFeatureGroup, String alias) throws FeatureStoreException, IOException {
        Dataset dataset = (Dataset)onDemandFeatureGroup.getStorageConnector().read(onDemandFeatureGroup.getQuery(), onDemandFeatureGroup.getDataFormat() != null ? onDemandFeatureGroup.getDataFormat().toString() : null, this.getOnDemandOptions(onDemandFeatureGroup), onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
        if (!Strings.isNullOrEmpty((String)onDemandFeatureGroup.getLocation())) {
            this.sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
        }
        dataset.createOrReplaceTempView(alias);
        return dataset;
    }

    public static List<Dataset<Row>> splitLabels(Dataset<Row> dataset, List<String> labels) {
        ArrayList results = Lists.newArrayList();
        if (labels != null && !labels.isEmpty()) {
            Column[] labelsCol = (Column[])labels.stream().map(label -> functions.col((String)label).alias(label.toLowerCase())).toArray(Column[]::new);
            results.add(dataset.drop((String[])labels.stream().toArray(String[]::new)));
            results.add(dataset.select(labelsCol));
        } else {
            results.add(dataset);
            results.add(null);
        }
        return results;
    }

    private Map<String, String> getOnDemandOptions(ExternalFeatureGroup externalFeatureGroup) {
        if (externalFeatureGroup.getOptions() == null) {
            return new HashMap<String, String>();
        }
        return externalFeatureGroup.getOptions().stream().collect(Collectors.toMap(OnDemandOptions::getName, OnDemandOptions::getValue));
    }

    public void registerHudiTemporaryTable(HudiFeatureGroupAlias hudiFeatureGroupAlias, Map<String, String> readOptions) {
        Map<String, String> hudiArgs = this.hudiEngine.setupHudiReadOpts(hudiFeatureGroupAlias.getLeftFeatureGroupStartTimestamp(), hudiFeatureGroupAlias.getLeftFeatureGroupEndTimestamp(), readOptions);
        this.sparkSession.read().format("org.apache.hudi").options(hudiArgs).load(hudiFeatureGroupAlias.getFeatureGroup().getLocation()).createOrReplaceTempView(hudiFeatureGroupAlias.getAlias());
    }

    public Dataset<Row>[] write(TrainingDataset trainingDataset, Dataset<Row> dataset, Map<String, String> writeOptions, SaveMode saveMode) throws FeatureStoreException, IOException {
        this.setupConnectorHadoopConf(trainingDataset.getStorageConnector());
        if (trainingDataset.getCoalesce().booleanValue()) {
            dataset = dataset.coalesce(1);
        }
        if (trainingDataset.getSplits() == null || trainingDataset.getSplits().isEmpty()) {
            String path = new Path(trainingDataset.getLocation(), trainingDataset.getName()).toString();
            this.writeSingle((Dataset<Row>)dataset, trainingDataset.getDataFormat(), writeOptions, saveMode, path);
            return new Dataset[]{dataset};
        }
        Dataset<Row>[] datasetSplits = this.splitDataset(trainingDataset, (Dataset<Row>)dataset);
        this.writeSplits(datasetSplits, trainingDataset.getDataFormat(), writeOptions, saveMode, trainingDataset.getLocation(), trainingDataset.getSplits());
        return datasetSplits;
    }

    public Dataset<Row>[] splitDataset(TrainingDataset trainingDataset, Dataset<Row> dataset) {
        List splitFactors = trainingDataset.getSplits().stream().map(Split::getPercentage).collect(Collectors.toList());
        Dataset[] datasetSplits = null;
        datasetSplits = trainingDataset.getSeed() != null ? dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray(), trainingDataset.getSeed().longValue()) : dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray());
        return datasetSplits;
    }

    public Map<String, String> getWriteOptions(Map<String, String> providedOptions, DataFormat dataFormat) {
        HashMap<String, String> writeOptions = new HashMap<String, String>();
        switch (dataFormat) {
            case CSV: {
                writeOptions.put("header", "true");
                writeOptions.put("delimiter", ",");
                break;
            }
            case TSV: {
                writeOptions.put("header", "true");
                writeOptions.put("delimiter", "\t");
                break;
            }
            case TFRECORDS: 
            case TFRECORD: {
                writeOptions.put("recordType", "Example");
                break;
            }
        }
        if (providedOptions != null && !providedOptions.isEmpty()) {
            writeOptions.putAll(providedOptions);
        }
        return writeOptions;
    }

    public Map<String, String> getReadOptions(Map<String, String> providedOptions, DataFormat dataFormat) {
        HashMap<String, String> readOptions = new HashMap<String, String>();
        switch (dataFormat) {
            case CSV: {
                readOptions.put("header", "true");
                readOptions.put("delimiter", ",");
                readOptions.put("inferSchema", "true");
                break;
            }
            case TSV: {
                readOptions.put("header", "true");
                readOptions.put("delimiter", "\t");
                readOptions.put("inferSchema", "true");
                break;
            }
            case TFRECORDS: 
            case TFRECORD: {
                readOptions.put("recordType", "Example");
                break;
            }
        }
        if (providedOptions != null && !providedOptions.isEmpty()) {
            readOptions.putAll(providedOptions);
        }
        return readOptions;
    }

    private void writeSplits(Dataset<Row>[] datasets, DataFormat dataFormat, Map<String, String> writeOptions, SaveMode saveMode, String basePath, List<Split> splits) {
        for (int i = 0; i < datasets.length; ++i) {
            this.writeSingle(datasets[i], dataFormat, writeOptions, saveMode, new Path(basePath, splits.get(i).getName()).toString());
        }
    }

    private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat, Map<String, String> writeOptions, SaveMode saveMode, String path) {
        dataset.write().format(dataFormat.toString()).options(writeOptions).mode(saveMode).save(SparkEngine.sparkPath(path));
    }

    public Dataset<Row> read(StorageConnector storageConnector, String dataFormat, Map<String, String> readOptions, String location) throws FeatureStoreException, IOException {
        this.setupConnectorHadoopConf(storageConnector);
        String path = "";
        path = location != null ? new Path(location, "**").toString() : null;
        path = SparkEngine.sparkPath(path);
        DataFrameReader reader = SparkEngine.getInstance().getSparkSession().read().format(dataFormat).options(readOptions);
        if (!Strings.isNullOrEmpty((String)path)) {
            if (dataFormat.equals("bigquery")) {
                return reader.load(location);
            }
            return reader.load(SparkEngine.sparkPath(path));
        }
        return reader.load();
    }

    public <S> void writeOnlineDataframe(FeatureGroupBase featureGroupBase, S dataset, String onlineTopicName, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        this.onlineFeatureGroupToAvro(featureGroupBase, this.encodeComplexFeatures(featureGroupBase, (Dataset<Row>)((Dataset)dataset))).write().format("kafka").options(writeOptions).option("topic", onlineTopicName).save();
    }

    public <S> StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase, S datasetGeneric, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String, String> writeOptions) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException {
        Dataset dataset = (Dataset)datasetGeneric;
        DataStreamWriter writer = this.onlineFeatureGroupToAvro(featureGroupBase, this.encodeComplexFeatures(featureGroupBase, (Dataset<Row>)dataset)).writeStream().format("kafka").outputMode(outputMode).option("checkpointLocation", checkpointLocation == null ? this.utils.checkpointDirPath(queryName, featureGroupBase.getOnlineTopicName()) : checkpointLocation).options(writeOptions).option("topic", featureGroupBase.getOnlineTopicName());
        StreamingQuery query = writer.start();
        if (awaitTermination) {
            query.awaitTermination(timeout.longValue());
        }
        return query;
    }

    public Dataset<Row> encodeComplexFeatures(FeatureGroupBase featureGroupBase, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        ArrayList<Column> select = new ArrayList<Column>();
        for (Schema.Field f : featureGroupBase.getDeserializedAvroSchema().getFields()) {
            if (featureGroupBase.getComplexFeatures().contains(f.name())) {
                select.add(org.apache.spark.sql.avro.functions.to_avro((Column)functions.col((String)f.name()), (String)featureGroupBase.getFeatureAvroSchema(f.name())).alias(f.name()));
                continue;
            }
            select.add(functions.col((String)f.name()));
        }
        return dataset.select((Column[])select.stream().toArray(Column[]::new));
    }

    private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroupBase featureGroupBase, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        return dataset.select(new Column[]{org.apache.spark.sql.avro.functions.to_avro((Column)functions.concat((Column[])((Column[])featureGroupBase.getPrimaryKeys().stream().map(name -> functions.col((String)name).cast("string")).toArray(Column[]::new)))).alias("key"), org.apache.spark.sql.avro.functions.to_avro((Column)functions.struct((Column[])((Column[])featureGroupBase.getDeserializedAvroSchema().getFields().stream().map(f -> functions.col((String)f.name())).toArray(Column[]::new))), (String)featureGroupBase.getEncodedAvroSchema()).alias("value")});
    }

    public <S> void writeOfflineDataframe(StreamFeatureGroup streamFeatureGroup, S genericDataset, HudiOperationType operation, Map<String, String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException {
        Dataset dataset = (Dataset)genericDataset;
        this.hudiEngine.saveHudiFeatureGroup(this.sparkSession, streamFeatureGroup, (Dataset<Row>)dataset, operation, writeOptions, validationId);
    }

    public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, HudiOperationType operation, Map<String, String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException {
        if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            this.hudiEngine.saveHudiFeatureGroup(this.sparkSession, featureGroup, dataset, operation, writeOptions, validationId);
        } else {
            this.writeSparkDataset(featureGroup, dataset, writeOptions);
        }
    }

    private void writeSparkDataset(FeatureGroup featureGroup, Dataset<Row> dataset, Map<String, String> writeOptions) {
        dataset.write().format("hive").mode(SaveMode.Append).options((Map)(writeOptions == null ? new HashMap() : writeOptions)).partitionBy(this.utils.getPartitionColumns(featureGroup)).saveAsTable(this.utils.getTableName(featureGroup));
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram, Boolean exactUniqueness) {
        if (correlation == null) {
            correlation = true;
        }
        if (histogram == null) {
            histogram = true;
        }
        if (exactUniqueness == null) {
            exactUniqueness = true;
        }
        ColumnProfilerRunBuilder runner = new ColumnProfilerRunner().onData(df).withCorrelation(correlation.booleanValue(), 100).withHistogram(histogram.booleanValue(), 20).withExactUniqueness(exactUniqueness.booleanValue());
        if (restrictToColumns != null && !restrictToColumns.isEmpty()) {
            runner.restrictToColumns(((Iterator)JavaConverters.asScalaIteratorConverter(restrictToColumns.iterator()).asScala()).toSeq());
        }
        ColumnProfiles result = runner.run();
        return ColumnProfiles.toJson((Seq)result.profiles().values().toSeq(), (long)result.numRecords());
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram) {
        return this.profile(df, restrictToColumns, correlation, histogram, true);
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns) {
        return this.profile(df, restrictToColumns, true, true);
    }

    public String profile(Dataset<Row> df, boolean correlation, boolean histogram) {
        return this.profile(df, null, correlation, histogram);
    }

    public String profile(Dataset<Row> df) {
        return this.profile(df, null, true, true);
    }

    public void setupConnectorHadoopConf(StorageConnector storageConnector) throws FeatureStoreException, IOException {
        if (storageConnector == null) {
            return;
        }
        storageConnector.refetch();
        switch (storageConnector.getStorageConnectorType()) {
            case S3: {
                this.setupS3ConnectorHadoopConf((StorageConnector.S3Connector)storageConnector);
                break;
            }
            case ADLS: {
                this.setupAdlsConnectorHadoopConf((StorageConnector.AdlsConnector)storageConnector);
                break;
            }
            case GCS: {
                this.setupGcsConnectorHadoopConf((StorageConnector.GcsConnector)storageConnector);
                break;
            }
        }
    }

    public static String sparkPath(String path) {
        if (path == null) {
            return null;
        }
        if (path.startsWith("s3://")) {
            return path.replaceFirst("s3://", "s3a://");
        }
        return path;
    }

    private void setupS3ConnectorHadoopConf(StorageConnector.S3Connector storageConnector) {
        if (!Strings.isNullOrEmpty((String)storageConnector.getAccessKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.access.key", storageConnector.getAccessKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getSecretKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.secret.key", storageConnector.getSecretKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getServerEncryptionAlgorithm())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-algorithm", storageConnector.getServerEncryptionAlgorithm());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getServerEncryptionKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-key", storageConnector.getServerEncryptionKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getSessionToken())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.session.token", storageConnector.getSessionToken());
        }
    }

    private void setupAdlsConnectorHadoopConf(StorageConnector.AdlsConnector storageConnector) {
        for (Option confOption : storageConnector.getSparkOptions()) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(confOption.getName(), confOption.getValue());
        }
    }

    public Dataset<Row> getEmptyAppendedDataframe(Dataset<Row> dataframe, List<Feature> newFeatures) {
        Dataset emptyDataframe = dataframe.limit(0);
        for (Feature f : newFeatures) {
            emptyDataframe = emptyDataframe.withColumn(f.getName(), functions.lit(null).cast(f.getType()));
        }
        return emptyDataframe;
    }

    public void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map<String, String> writeOptions) throws Exception {
        writeOptions = this.utils.getKafkaConfig(streamFeatureGroup, writeOptions);
        this.hudiEngine.streamToHoodieTable(this.sparkSession, streamFeatureGroup, writeOptions);
    }

    public <S> List<Feature> parseFeatureGroupSchema(S datasetGeneric, TimeTravelFormat timeTravelFormat) throws FeatureStoreException {
        ArrayList<Feature> features = new ArrayList<Feature>();
        Dataset dataset = (Dataset)datasetGeneric;
        Boolean usingHudi = timeTravelFormat == TimeTravelFormat.HUDI;
        for (StructField structField : dataset.schema().fields()) {
            String featureType = "";
            if (!usingHudi.booleanValue()) {
                featureType = structField.dataType().catalogString();
            } else if (structField.dataType() instanceof ByteType) {
                featureType = "int";
            } else if (structField.dataType() instanceof ShortType) {
                featureType = "int";
            } else if (structField.dataType() instanceof BooleanType || structField.dataType() instanceof IntegerType || structField.dataType() instanceof LongType || structField.dataType() instanceof FloatType || structField.dataType() instanceof DoubleType || structField.dataType() instanceof DecimalType || structField.dataType() instanceof TimestampType || structField.dataType() instanceof DateType || structField.dataType() instanceof StringType || structField.dataType() instanceof ArrayType || structField.dataType() instanceof StructType || structField.dataType() instanceof BinaryType) {
                featureType = structField.dataType().catalogString();
            } else {
                throw new FeatureStoreException("Feature '" + structField.name().toLowerCase() + "': spark type " + structField.dataType().catalogString() + " not supported.");
            }
            Feature f = new Feature(structField.name().toLowerCase(), featureType, false, false);
            if (structField.metadata().contains("description")) {
                f.setDescription(structField.metadata().getString("description"));
            }
            features.add(f);
        }
        return features;
    }

    public <S> S sanitizeFeatureNames(S datasetGeneric) {
        Dataset dataset = (Dataset)datasetGeneric;
        return (S)dataset.select((Column[])Arrays.asList(dataset.columns()).stream().map(f -> functions.col((String)f).alias(f.toLowerCase())).toArray(Column[]::new));
    }

    public String addFile(String filePath) {
        this.sparkSession.sparkContext().addFile("hdfs://" + filePath);
        return SparkFiles.get((String)new Path(filePath).getName());
    }

    public Dataset<Row> readStream(StorageConnector storageConnector, String dataFormat, String messageFormat, String schema, Map<String, String> options, boolean includeMetadata) throws FeatureStoreException {
        DataStreamReader stream = this.sparkSession.readStream().format(dataFormat);
        stream = stream.options(storageConnector.sparkOptions()).options(options);
        if (storageConnector instanceof StorageConnector.KafkaConnector) {
            return this.readStreamKafka(stream, messageFormat, schema, includeMetadata);
        }
        throw new FeatureStoreException("Connector does not support reading data into stream.");
    }

    private Dataset<Row> readStreamKafka(DataStreamReader stream, String messageFormat, String schema, boolean includeMetadata) throws SchemaParseException, FeatureStoreException {
        Column[] kafkaMetadataColumns = Arrays.asList(functions.col((String)"key"), functions.col((String)"topic"), functions.col((String)"partition"), functions.col((String)"offset"), functions.col((String)"timestamp"), functions.col((String)"timestampType"), functions.col((String)"value.*")).toArray(new Column[7]);
        if (messageFormat.equals("avro") && !Strings.isNullOrEmpty((String)schema)) {
            Schema.Parser parser = new Schema.Parser();
            parser.parse(schema);
            Dataset df = stream.load();
            if (includeMetadata) {
                return df.withColumn("value", org.apache.spark.sql.avro.functions.from_avro((Column)df.col("value"), (String)schema)).select(kafkaMetadataColumns);
            }
            return df.withColumn("value", org.apache.spark.sql.avro.functions.from_avro((Column)df.col("value"), (String)schema)).select(new Column[]{functions.col((String)"value.*")});
        }
        if (messageFormat.equals("json") && !Strings.isNullOrEmpty((String)schema)) {
            Dataset df = stream.load();
            if (includeMetadata) {
                return df.withColumn("value", functions.from_json((Column)df.col("value").cast("string"), (String)schema, new HashMap())).select(kafkaMetadataColumns);
            }
            return df.withColumn("value", functions.from_json((Column)df.col("value").cast("string"), (String)schema, new HashMap())).select(new Column[]{functions.col((String)"value.*")});
        }
        if (includeMetadata) {
            return stream.load();
        }
        return stream.load().select("key", new String[]{"value"});
    }

    public Dataset<Row> objectToDataset(Object obj) {
        return (Dataset)obj;
    }

    private void setupGcsConnectorHadoopConf(StorageConnector.GcsConnector storageConnector) {
        this.sparkSession.sparkContext().hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
        this.sparkSession.sparkContext().hadoopConfiguration().set("google.cloud.auth.service.account.enable", "true");
        String localPath = this.addFile(storageConnector.getKeyPath());
        this.sparkSession.sparkContext().hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", localPath);
        if (!Strings.isNullOrEmpty((String)storageConnector.getAlgorithm())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.gs.encryption.algorithm", storageConnector.getAlgorithm());
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.gs.encryption.key", storageConnector.getEncryptionKey());
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.gs.encryption.key.hash", storageConnector.getEncryptionKeyHash());
        } else {
            this.sparkSession.sparkContext().hadoopConfiguration().unset("fs.gs.encryption.algorithm");
            this.sparkSession.sparkContext().hadoopConfiguration().unset("fs.gs.encryption.key");
            this.sparkSession.sparkContext().hadoopConfiguration().unset("fs.gs.encryption.key.hash");
        }
    }

    public <S> S createEmptyDataFrame(S datasetGeneric) {
        Dataset dataset = (Dataset)datasetGeneric;
        ArrayList rows = new ArrayList();
        return (S)this.sparkSession.sqlContext().createDataFrame(rows, dataset.schema());
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }
}

