/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.util;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.spark.engine.SparkEngine;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class StorageConnectorUtils {
    public Dataset<Row> read(StorageConnector.HopsFsConnector connector, String dataFormat, Map<String, String> options, String path) throws FeatureStoreException, IOException {
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, path);
    }

    public Dataset<Row> read(StorageConnector.S3Connector connector, String dataFormat, Map<String, String> options, String path) throws FeatureStoreException, IOException {
        connector.update();
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, path);
    }

    public Dataset<Row> read(StorageConnector.RedshiftConnector connector, String query) throws FeatureStoreException, IOException {
        connector.update();
        Map readOptions = connector.sparkOptions();
        if (!Strings.isNullOrEmpty((String)query)) {
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "jdbc", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector.AdlsConnector connector, String dataFormat, Map<String, String> options, String path) throws FeatureStoreException, IOException {
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, path);
    }

    public Dataset<Row> read(StorageConnector.SnowflakeConnector connector, String query) throws FeatureStoreException, IOException {
        Map readOptions = connector.sparkOptions();
        if (!Strings.isNullOrEmpty((String)query)) {
            readOptions.remove("dbtable");
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "net.snowflake.spark.snowflake", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector.JdbcConnector connector, String query) throws FeatureStoreException, IOException {
        connector.update();
        Map readOptions = connector.sparkOptions();
        if (!Strings.isNullOrEmpty((String)query)) {
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read(connector.refetch(), "jdbc", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector.GcsConnector connector, String dataFormat, Map<String, String> options, String path) throws FeatureStoreException, IOException {
        SparkEngine.getInstance().setupConnectorHadoopConf((StorageConnector)connector);
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, path);
    }

    public Dataset<Row> read(StorageConnector.BigqueryConnector connector, String query, Map<String, String> options, String path) throws FeatureStoreException, IOException {
        Map readOptions = connector.sparkOptions();
        String localKeyPath = SparkEngine.getInstance().addFile(connector.getKeyPath());
        byte[] fileContent = Files.readAllBytes(Paths.get(localKeyPath, new String[0]));
        options.put("credentials", Base64.getEncoder().encodeToString(fileContent));
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        if (!Strings.isNullOrEmpty((String)query)) {
            path = query;
        } else if (!Strings.isNullOrEmpty((String)connector.getQueryTable())) {
            path = connector.getQueryTable();
        } else if (Strings.isNullOrEmpty((String)path)) {
            throw new IllegalArgumentException("Either query should be provided or Query Project,Dataset and Table should be set");
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "bigquery", readOptions, path);
    }

    public Dataset<Row> read(StorageConnector connector, String query, String dataFormat, Map<String, String> options, String path) throws FeatureStoreException, IOException {
        if (connector instanceof StorageConnector.HopsFsConnector) {
            return this.read((StorageConnector.HopsFsConnector)connector, dataFormat, options, path);
        }
        if (connector instanceof StorageConnector.S3Connector) {
            return this.read((StorageConnector.S3Connector)connector, dataFormat, options, path);
        }
        if (connector instanceof StorageConnector.RedshiftConnector) {
            return this.read((StorageConnector.RedshiftConnector)connector, query);
        }
        if (connector instanceof StorageConnector.AdlsConnector) {
            return this.read((StorageConnector.AdlsConnector)connector, dataFormat, options, path);
        }
        if (connector instanceof StorageConnector.SnowflakeConnector) {
            return this.read((StorageConnector.SnowflakeConnector)connector, query);
        }
        if (connector instanceof StorageConnector.JdbcConnector) {
            return this.read((StorageConnector.JdbcConnector)connector, query);
        }
        if (connector instanceof StorageConnector.GcsConnector) {
            return this.read((StorageConnector.GcsConnector)connector, dataFormat, options, path);
        }
        if (connector instanceof StorageConnector.BigqueryConnector) {
            return this.read((StorageConnector.BigqueryConnector)connector, query, options, path);
        }
        if (connector instanceof StorageConnector.KafkaConnector) {
            throw new NotSupportedException("Reading a Kafka Stream into a static Spark Dataframe is not supported.");
        }
        throw new FeatureStoreException("Unknown type of StorageConnector.");
    }

    public Dataset<Row> readStream(StorageConnector.KafkaConnector connector, String topic, boolean topicPattern, String messageFormat, String schema, Map<String, String> options, boolean includeMetadata) throws FeatureStoreException, IOException {
        if (!Arrays.asList("avro", "json", null).contains(messageFormat.toLowerCase())) {
            throw new IllegalArgumentException("Can only read JSON and AVRO encoded records from Kafka.");
        }
        connector.setSslTruststoreLocation(SparkEngine.getInstance().addFile(connector.getSslTruststoreLocation()));
        connector.setSslKeystoreLocation(SparkEngine.getInstance().addFile(connector.getSslTruststoreLocation()));
        if (topicPattern) {
            options.put("subscribePattern", topic);
        } else {
            options.put("subscribe", topic);
        }
        return SparkEngine.getInstance().readStream((StorageConnector)connector, "kafka", messageFormat.toLowerCase(), schema, options, includeMetadata);
    }
}

