/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.amazon.deequ.profiles.ColumnProfilerRunBuilder;
import com.amazon.deequ.profiles.ColumnProfilerRunner;
import com.amazon.deequ.profiles.ColumnProfiles;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.OnDemandFeatureGroup;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.engine.HudiEngine;
import com.logicalclocks.hsfs.engine.Utils;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import com.logicalclocks.hsfs.metadata.Option;
import java.io.IOException;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class SparkEngine {
    private static SparkEngine INSTANCE = null;
    private SparkSession sparkSession;
    private Utils utils = new Utils();
    private HudiEngine hudiEngine = new HudiEngine();

    public static synchronized SparkEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SparkEngine();
        }
        return INSTANCE;
    }

    private SparkEngine() {
        this.sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();
        this.sparkSession.conf().set("hive.exec.dynamic.partition", "true");
        this.sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
        this.sparkSession.conf().set("spark.sql.hive.convertMetastoreParquet", "false");
    }

    public String getTrustStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.trustore.name");
    }

    public String getKeyStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystore.name");
    }

    public String getCertKey() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystores.passwd.name");
    }

    public Dataset<Row> sql(String query) {
        return this.sparkSession.sql(query);
    }

    public Dataset<Row> registerOnDemandTemporaryTable(OnDemandFeatureGroup onDemandFeatureGroup, String alias) throws FeatureStoreException, IOException {
        Dataset<Row> dataset = onDemandFeatureGroup.getStorageConnector().read(onDemandFeatureGroup.getQuery(), onDemandFeatureGroup.getDataFormat() != null ? onDemandFeatureGroup.getDataFormat().toString() : null, this.getOnDemandOptions(onDemandFeatureGroup), onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
        dataset.createOrReplaceTempView(alias);
        return dataset;
    }

    private Map<String, String> getOnDemandOptions(OnDemandFeatureGroup onDemandFeatureGroup) {
        if (onDemandFeatureGroup.getOptions() == null) {
            return new HashMap<String, String>();
        }
        return onDemandFeatureGroup.getOptions().stream().collect(Collectors.toMap(OnDemandOptions::getName, OnDemandOptions::getValue));
    }

    public void registerHudiTemporaryTable(FeatureGroup featureGroup, String alias, Long leftFeaturegroupStartTimestamp, Long leftFeaturegroupEndTimestamp, Map<String, String> readOptions) {
        this.hudiEngine.registerTemporaryTable(this.sparkSession, featureGroup, alias, leftFeaturegroupStartTimestamp, leftFeaturegroupEndTimestamp, readOptions);
    }

    public void write(TrainingDataset trainingDataset, Dataset<Row> dataset, Map<String, String> writeOptions, SaveMode saveMode) {
        this.setupConnectorHadoopConf(trainingDataset.getStorageConnector());
        if (trainingDataset.getCoalesce().booleanValue()) {
            dataset = dataset.coalesce(1);
        }
        if (trainingDataset.getSplits() == null) {
            String path = new Path(trainingDataset.getLocation(), trainingDataset.getName()).toString();
            this.writeSingle((Dataset<Row>)dataset, trainingDataset.getDataFormat(), writeOptions, saveMode, path);
        } else {
            List splitFactors = trainingDataset.getSplits().stream().map(Split::getPercentage).collect(Collectors.toList());
            Dataset[] datasetSplits = null;
            datasetSplits = trainingDataset.getSeed() != null ? dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray(), trainingDataset.getSeed().longValue()) : dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray());
            this.writeSplits(datasetSplits, trainingDataset.getDataFormat(), writeOptions, saveMode, trainingDataset.getLocation(), trainingDataset.getSplits());
        }
    }

    public Map<String, String> getWriteOptions(Map<String, String> providedOptions, DataFormat dataFormat) {
        HashMap<String, String> writeOptions = new HashMap<String, String>();
        switch (dataFormat) {
            case CSV: {
                writeOptions.put("header", "true");
                writeOptions.put("delimiter", ",");
                break;
            }
            case TSV: {
                writeOptions.put("header", "true");
                writeOptions.put("delimiter", "\t");
                break;
            }
            case TFRECORDS: 
            case TFRECORD: {
                writeOptions.put("recordType", "Example");
                break;
            }
        }
        if (providedOptions != null && !providedOptions.isEmpty()) {
            writeOptions.putAll(providedOptions);
        }
        return writeOptions;
    }

    public Map<String, String> getReadOptions(Map<String, String> providedOptions, DataFormat dataFormat) {
        HashMap<String, String> readOptions = new HashMap<String, String>();
        switch (dataFormat) {
            case CSV: {
                readOptions.put("header", "true");
                readOptions.put("delimiter", ",");
                readOptions.put("inferSchema", "true");
                break;
            }
            case TSV: {
                readOptions.put("header", "true");
                readOptions.put("delimiter", "\t");
                readOptions.put("inferSchema", "true");
                break;
            }
            case TFRECORDS: 
            case TFRECORD: {
                readOptions.put("recordType", "Example");
                break;
            }
        }
        if (providedOptions != null && !providedOptions.isEmpty()) {
            readOptions.putAll(providedOptions);
        }
        return readOptions;
    }

    private void writeSplits(Dataset<Row>[] datasets, DataFormat dataFormat, Map<String, String> writeOptions, SaveMode saveMode, String basePath, List<Split> splits) {
        for (int i = 0; i < datasets.length; ++i) {
            this.writeSingle(datasets[i], dataFormat, writeOptions, saveMode, new Path(basePath, splits.get(i).getName()).toString());
        }
    }

    private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat, Map<String, String> writeOptions, SaveMode saveMode, String path) {
        dataset.write().format(dataFormat.toString()).options(writeOptions).mode(saveMode).save(SparkEngine.sparkPath(path));
    }

    public Dataset<Row> read(StorageConnector storageConnector, String dataFormat, Map<String, String> readOptions, String location) {
        this.setupConnectorHadoopConf(storageConnector);
        String path = "";
        path = location != null ? new Path(location, "**").toString() : null;
        path = SparkEngine.sparkPath(path);
        DataFrameReader reader = SparkEngine.getInstance().getSparkSession().read().format(dataFormat).options(readOptions);
        if (!Strings.isNullOrEmpty((String)path)) {
            return reader.load(SparkEngine.sparkPath(path));
        }
        return reader.load();
    }

    public void writeOnlineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        this.onlineFeatureGroupToAvro(featureGroup, this.encodeComplexFeatures(featureGroup, dataset)).write().format("kafka").options(writeOptions).option("topic", featureGroup.getOnlineTopicName()).save();
    }

    public StreamingQuery writeStreamDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, String queryName, String outputMode, boolean awaitTermination, Long timeout, Map<String, String> writeOptions) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException {
        if (Strings.isNullOrEmpty((String)queryName)) {
            queryName = "insert_stream_" + featureGroup.getOnlineTopicName() + "_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
        }
        DataStreamWriter writer = this.onlineFeatureGroupToAvro(featureGroup, this.encodeComplexFeatures(featureGroup, dataset)).writeStream().format("kafka").outputMode(outputMode).options(writeOptions).option("checkpointLocation", "/Projects/" + HopsworksClient.getInstance().getProject().getProjectName() + "/Resources/" + queryName + "-checkpoint").option("topic", featureGroup.getOnlineTopicName());
        StreamingQuery query = writer.start();
        if (awaitTermination) {
            query.awaitTermination(timeout.longValue());
        }
        return query;
    }

    public Dataset<Row> encodeComplexFeatures(FeatureGroup featureGroup, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        ArrayList<Column> select = new ArrayList<Column>();
        for (Schema.Field f : featureGroup.getDeserializedAvroSchema().getFields()) {
            if (featureGroup.getComplexFeatures().contains(f.name())) {
                select.add(org.apache.spark.sql.avro.functions.to_avro((Column)functions.col((String)f.name()), (String)featureGroup.getFeatureAvroSchema(f.name())).alias(f.name()));
                continue;
            }
            select.add(functions.col((String)f.name()));
        }
        return dataset.select((Column[])select.stream().toArray(Column[]::new));
    }

    private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroup featureGroup, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        Collections.sort(featureGroup.getPrimaryKeys());
        return dataset.select(new Column[]{org.apache.spark.sql.avro.functions.to_avro((Column)functions.concat((Column[])((Column[])featureGroup.getPrimaryKeys().stream().map(name -> functions.col((String)name).cast("string")).toArray(Column[]::new)))).alias("key"), org.apache.spark.sql.avro.functions.to_avro((Column)functions.struct((Column[])((Column[])featureGroup.getDeserializedAvroSchema().getFields().stream().map(f -> functions.col((String)f.name())).toArray(Column[]::new))), (String)featureGroup.getEncodedAvroSchema()).alias("value")});
    }

    public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, HudiOperationType operation, Map<String, String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException {
        if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            this.hudiEngine.saveHudiFeatureGroup(this.sparkSession, featureGroup, dataset, operation, writeOptions, validationId);
        } else {
            this.writeSparkDataset(featureGroup, dataset, writeOptions);
        }
    }

    private void writeSparkDataset(FeatureGroup featureGroup, Dataset<Row> dataset, Map<String, String> writeOptions) {
        dataset.write().format("hive").mode(SaveMode.Append).options((Map)(writeOptions == null ? new HashMap() : writeOptions)).partitionBy(this.utils.getPartitionColumns(featureGroup)).saveAsTable(this.utils.getTableName(featureGroup));
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram) {
        if (correlation == null) {
            correlation = true;
        }
        if (histogram == null) {
            histogram = true;
        }
        ColumnProfilerRunBuilder runner = new ColumnProfilerRunner().onData(df).withCorrelation(correlation.booleanValue()).withHistogram(histogram.booleanValue());
        if (restrictToColumns != null && !restrictToColumns.isEmpty()) {
            runner.restrictToColumns(((Iterator)JavaConverters.asScalaIteratorConverter(restrictToColumns.iterator()).asScala()).toSeq());
        }
        ColumnProfiles result = runner.run();
        return ColumnProfiles.toJson((Seq)result.profiles().values().toSeq());
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns) {
        return this.profile(df, restrictToColumns, true, true);
    }

    public String profile(Dataset<Row> df, boolean correlation, boolean histogram) {
        return this.profile(df, null, correlation, histogram);
    }

    public String profile(Dataset<Row> df) {
        return this.profile(df, null, true, true);
    }

    public void setupConnectorHadoopConf(StorageConnector storageConnector) {
        if (storageConnector == null) {
            return;
        }
        switch (storageConnector.getStorageConnectorType()) {
            case S3: {
                this.setupS3ConnectorHadoopConf((StorageConnector.S3Connector)storageConnector);
                break;
            }
            case ADLS: {
                this.setupAdlsConnectorHadoopConf((StorageConnector.AdlsConnector)storageConnector);
                break;
            }
        }
    }

    public static String sparkPath(String path) {
        if (path == null) {
            return null;
        }
        if (path.startsWith("s3://")) {
            return path.replaceFirst("s3://", "s3a://");
        }
        return path;
    }

    private void setupS3ConnectorHadoopConf(StorageConnector.S3Connector storageConnector) {
        if (!Strings.isNullOrEmpty((String)storageConnector.getAccessKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.access.key", storageConnector.getAccessKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getSecretKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.secret.key", storageConnector.getSecretKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getServerEncryptionAlgorithm())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-algorithm", storageConnector.getServerEncryptionAlgorithm());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getServerEncryptionKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-key", storageConnector.getServerEncryptionKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getSessionToken())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.session.token", storageConnector.getSessionToken());
        }
    }

    private void setupAdlsConnectorHadoopConf(StorageConnector.AdlsConnector storageConnector) {
        for (Option confOption : storageConnector.getSparkOptions()) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(confOption.getName(), confOption.getValue());
        }
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }
}

