/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.amazon.deequ.profiles.ColumnProfilerRunBuilder;
import com.amazon.deequ.profiles.ColumnProfilerRunner;
import com.amazon.deequ.profiles.ColumnProfiles;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StorageConnectorType;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.engine.HudiEngine;
import com.logicalclocks.hsfs.engine.Utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Strings;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class SparkEngine {
    private static SparkEngine INSTANCE = null;
    private SparkSession sparkSession;
    private Utils utils = new Utils();
    private HudiEngine hudiEngine = new HudiEngine();

    public static synchronized SparkEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SparkEngine();
        }
        return INSTANCE;
    }

    private SparkEngine() {
        this.sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();
        this.sparkSession.conf().set("hive.exec.dynamic.partition", "true");
        this.sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
        this.sparkSession.conf().set("spark.sql.hive.convertMetastoreParquet", "false");
    }

    public Dataset<Row> sql(String query) {
        return this.sparkSession.sql(query);
    }

    public Dataset<Row> jdbc(StorageConnector storageConnector, String query) throws FeatureStoreException {
        Map<String, String> readOptions = storageConnector.getSparkOptions();
        readOptions.put("query", query);
        return this.sparkSession.read().format("jdbc").options(readOptions).load();
    }

    public void registerHudiTemporaryTable(FeatureGroup featureGroup, String alias, Long leftFeaturegroupStartTimestamp, Long leftFeaturegroupEndTimestamp, Map<String, String> readOptions) {
        this.hudiEngine.registerTemporaryTable(this.sparkSession, featureGroup, alias, leftFeaturegroupStartTimestamp, leftFeaturegroupEndTimestamp, readOptions);
    }

    public void configureConnector(StorageConnector storageConnector) {
        if (storageConnector.getStorageConnectorType() == StorageConnectorType.S3) {
            this.configureS3Connector(storageConnector);
        }
    }

    public static String sparkPath(String path) {
        if (path.startsWith("s3://")) {
            return path.replaceFirst("s3://", "s3a://");
        }
        return path;
    }

    private void configureS3Connector(StorageConnector storageConnector) {
        if (!Strings.isNullOrEmpty((String)storageConnector.getAccessKey())) {
            this.sparkSession.conf().set("fs.s3a.access.key", storageConnector.getAccessKey());
            this.sparkSession.conf().set("fs.s3a.secret.key", storageConnector.getSecretKey());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getServerEncryptionAlgorithm())) {
            this.sparkSession.conf().set("fs.s3a.server-side-encryption-algorithm", storageConnector.getServerEncryptionAlgorithm());
        }
        if (!Strings.isNullOrEmpty((String)storageConnector.getServerEncryptionKey())) {
            this.sparkSession.conf().set("fs.s3a.server-side-encryption.key", storageConnector.getServerEncryptionKey());
        }
    }

    public void write(TrainingDataset trainingDataset, Dataset<Row> dataset, Map<String, String> writeOptions, SaveMode saveMode) {
        if (trainingDataset.getStorageConnector() != null) {
            SparkEngine.getInstance().configureConnector(trainingDataset.getStorageConnector());
        }
        if (trainingDataset.getSplits() == null) {
            String path = new Path(trainingDataset.getLocation(), trainingDataset.getName()).toString();
            this.writeSingle(dataset, trainingDataset.getDataFormat(), writeOptions, saveMode, path);
        } else {
            List splitFactors = trainingDataset.getSplits().stream().map(Split::getPercentage).collect(Collectors.toList());
            Dataset[] datasetSplits = null;
            datasetSplits = trainingDataset.getSeed() != null ? dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray(), trainingDataset.getSeed().longValue()) : dataset.randomSplit(splitFactors.stream().mapToDouble(Float::doubleValue).toArray());
            this.writeSplits(datasetSplits, trainingDataset.getDataFormat(), writeOptions, saveMode, trainingDataset.getLocation(), trainingDataset.getSplits());
        }
    }

    public Map<String, String> getWriteOptions(Map<String, String> providedOptions, DataFormat dataFormat) {
        HashMap<String, String> writeOptions = new HashMap<String, String>();
        switch (dataFormat) {
            case CSV: {
                writeOptions.put("header", "true");
                writeOptions.put("delimiter", ",");
                break;
            }
            case TSV: {
                writeOptions.put("header", "true");
                writeOptions.put("delimiter", "\t");
                break;
            }
            case TFRECORDS: 
            case TFRECORD: {
                writeOptions.put("recordType", "Example");
                break;
            }
        }
        if (providedOptions != null && !providedOptions.isEmpty()) {
            writeOptions.putAll(providedOptions);
        }
        return writeOptions;
    }

    public Map<String, String> getReadOptions(Map<String, String> providedOptions, DataFormat dataFormat) {
        HashMap<String, String> readOptions = new HashMap<String, String>();
        switch (dataFormat) {
            case CSV: {
                readOptions.put("header", "true");
                readOptions.put("delimiter", ",");
                readOptions.put("inferSchema", "true");
                break;
            }
            case TSV: {
                readOptions.put("header", "true");
                readOptions.put("delimiter", "\t");
                readOptions.put("inferSchema", "true");
                break;
            }
            case TFRECORDS: 
            case TFRECORD: {
                readOptions.put("recordType", "Example");
                break;
            }
        }
        if (providedOptions != null && !providedOptions.isEmpty()) {
            readOptions.putAll(providedOptions);
        }
        return readOptions;
    }

    private void writeSplits(Dataset<Row>[] datasets, DataFormat dataFormat, Map<String, String> writeOptions, SaveMode saveMode, String basePath, List<Split> splits) {
        for (int i = 0; i < datasets.length; ++i) {
            this.writeSingle(datasets[i], dataFormat, writeOptions, saveMode, new Path(basePath, splits.get(i).getName()).toString());
        }
    }

    private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat, Map<String, String> writeOptions, SaveMode saveMode, String path) {
        dataset.write().format(dataFormat.toString()).options(writeOptions).mode(saveMode).save(SparkEngine.sparkPath(path));
    }

    public Dataset<Row> read(DataFormat dataFormat, Map<String, String> readOptions, String path) {
        return SparkEngine.getInstance().getSparkSession().read().format(dataFormat.toString()).options(readOptions).load(SparkEngine.sparkPath(path));
    }

    public Map<String, String> getOnlineOptions(Map<String, String> providedWriteOptions, FeatureGroup featureGroup, StorageConnector storageConnector) throws FeatureStoreException {
        Map<String, String> writeOptions = storageConnector.getSparkOptions();
        writeOptions.put("dbtable", this.utils.getFgName(featureGroup));
        if (providedWriteOptions != null) {
            writeOptions.putAll(providedWriteOptions);
        }
        return writeOptions;
    }

    public void writeOnlineDataframe(Dataset<Row> dataset, SaveMode saveMode, Map<String, String> writeOptions) {
        dataset.write().format("jdbc").options(writeOptions).mode(saveMode).save();
    }

    public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, SaveMode saveMode, HudiOperationType operation, Map<String, String> writeOptions) throws IOException, FeatureStoreException {
        if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            this.hudiEngine.saveHudiFeatureGroup(this.sparkSession, featureGroup, dataset, saveMode, operation, writeOptions);
        } else {
            this.writeSparkDataset(featureGroup, dataset, saveMode, writeOptions);
        }
    }

    private void writeSparkDataset(FeatureGroup featureGroup, Dataset<Row> dataset, SaveMode saveMode, Map<String, String> writeOptions) {
        dataset.write().format("hive").mode(saveMode).options((Map)(writeOptions == null ? new HashMap() : writeOptions)).partitionBy(this.utils.getPartitionColumns(featureGroup)).saveAsTable(this.utils.getTableName(featureGroup));
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram) {
        if (correlation == null) {
            correlation = true;
        }
        if (histogram == null) {
            histogram = true;
        }
        ColumnProfilerRunBuilder runner = new ColumnProfilerRunner().onData(df).withCorrelation(correlation.booleanValue()).withHistogram(histogram.booleanValue());
        if (restrictToColumns != null && !restrictToColumns.isEmpty()) {
            runner.restrictToColumns(((Iterator)JavaConverters.asScalaIteratorConverter(restrictToColumns.iterator()).asScala()).toSeq());
        }
        ColumnProfiles result = runner.run();
        return ColumnProfiles.toJson((Seq)result.profiles().values().toSeq());
    }

    public String profile(Dataset<Row> df, List<String> restrictToColumns) {
        return this.profile(df, restrictToColumns, true, true);
    }

    public String profile(Dataset<Row> df, boolean correlation, boolean histogram) {
        return this.profile(df, null, correlation, histogram);
    }

    public String profile(Dataset<Row> df) {
        return this.profile(df, null, true, true);
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }
}

