/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.util;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.DataSource;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.spark.engine.SparkEngine;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.logging.Logger;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class StorageConnectorUtils {
    Logger logger = Logger.getLogger(StorageConnectorUtils.class.getName());

    public Dataset<Row> read(StorageConnector.HopsFsConnector connector, DataSource dataSource, String dataFormat, Map<String, String> options) throws FeatureStoreException, IOException {
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, dataSource.getPath());
    }

    public Dataset<Row> read(StorageConnector.S3Connector connector, DataSource dataSource, String dataFormat, Map<String, String> options) throws FeatureStoreException, IOException {
        String path;
        connector.update();
        Map readOptions = connector.sparkOptions(dataSource);
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        if ((path = dataSource.getPath()) != null && !path.startsWith("s3://")) {
            path = connector.getPath(path);
            this.logger.info(String.format("Prepending default bucket specified on connector, final path: %s", path));
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, readOptions, path);
    }

    public Dataset<Row> read(StorageConnector.RedshiftConnector connector, DataSource dataSource, Map<String, String> options) throws FeatureStoreException, IOException {
        String query;
        connector.update();
        Map readOptions = connector.sparkOptions(dataSource);
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        if (!Strings.isNullOrEmpty((String)(query = dataSource.getQuery()))) {
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "jdbc", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector.AdlsConnector connector, DataSource dataSource, String dataFormat, Map<String, String> options) throws FeatureStoreException, IOException {
        String path = dataSource.getPath();
        if (!(path == null || path.startsWith("abfss://") && path.startsWith("adl://"))) {
            path = connector.getPath(path);
            this.logger.info(String.format("Using default container specified on connector, final path: %s", path));
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, path);
    }

    public Dataset<Row> read(StorageConnector.SnowflakeConnector connector, DataSource dataSource, Map<String, String> options) throws FeatureStoreException, IOException {
        String query;
        Map readOptions = connector.sparkOptions(dataSource);
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        if (!Strings.isNullOrEmpty((String)(query = dataSource.getQuery()))) {
            readOptions.remove("dbtable");
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "net.snowflake.spark.snowflake", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector.JdbcConnector connector, DataSource dataSource, Map<String, String> options) throws FeatureStoreException, IOException {
        String query;
        connector.update();
        Map readOptions = connector.sparkOptions(dataSource);
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        if (!Strings.isNullOrEmpty((String)(query = dataSource.getQuery()))) {
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "jdbc", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector.GcsConnector connector, DataSource dataSource, String dataFormat, Map<String, String> options) throws FeatureStoreException, IOException {
        String path = dataSource.getPath();
        if (path != null && !path.startsWith("gs://")) {
            path = connector.getPath(path);
            this.logger.info(String.format("Prepending default bucket specified on connector, final path: %s", path));
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, dataFormat, options, path);
    }

    public Dataset<Row> read(StorageConnector.BigqueryConnector connector, DataSource dataSource, Map<String, String> options) throws FeatureStoreException, IOException {
        Map readOptions = connector.sparkOptions(dataSource);
        String localKeyPath = SparkEngine.getInstance().addFile(connector.getKeyPath());
        byte[] fileContent = Files.readAllBytes(Paths.get(localKeyPath, new String[0]));
        options.put("credentials", Base64.getEncoder().encodeToString(fileContent));
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        String query = dataSource.getQuery();
        String path = dataSource.getPath();
        if (!Strings.isNullOrEmpty((String)query)) {
            path = query;
        } else if (!Strings.isNullOrEmpty((String)dataSource.getTable())) {
            path = dataSource.getTable();
        } else if (!Strings.isNullOrEmpty((String)connector.getQueryTable())) {
            path = connector.getQueryTable();
        } else if (Strings.isNullOrEmpty((String)path)) {
            throw new IllegalArgumentException("Either query should be provided or Query Project,Dataset and Table should be set");
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "bigquery", readOptions, path);
    }

    public Dataset<Row> read(StorageConnector.RdsConnector connector, DataSource dataSource, Map<String, String> options) throws FeatureStoreException, IOException {
        String query;
        connector.update();
        Map readOptions = connector.sparkOptions(dataSource);
        if (options != null && !options.isEmpty()) {
            readOptions.putAll(options);
        }
        if (!Strings.isNullOrEmpty((String)(query = dataSource.getQuery()))) {
            readOptions.put("query", query);
        }
        return SparkEngine.getInstance().read((StorageConnector)connector, "jdbc", readOptions, null);
    }

    public Dataset<Row> read(StorageConnector connector, DataSource dataSource, String dataFormat, Map<String, String> options) throws FeatureStoreException, IOException {
        if (connector instanceof StorageConnector.HopsFsConnector) {
            return this.read((StorageConnector.HopsFsConnector)connector, dataSource, dataFormat, options);
        }
        if (connector instanceof StorageConnector.S3Connector) {
            return this.read((StorageConnector.S3Connector)connector, dataSource, dataFormat, options);
        }
        if (connector instanceof StorageConnector.RedshiftConnector) {
            return this.read((StorageConnector.RedshiftConnector)connector, dataSource, options);
        }
        if (connector instanceof StorageConnector.AdlsConnector) {
            return this.read((StorageConnector.AdlsConnector)connector, dataSource, dataFormat, options);
        }
        if (connector instanceof StorageConnector.SnowflakeConnector) {
            return this.read((StorageConnector.SnowflakeConnector)connector, dataSource, options);
        }
        if (connector instanceof StorageConnector.JdbcConnector) {
            return this.read((StorageConnector.JdbcConnector)connector, dataSource, options);
        }
        if (connector instanceof StorageConnector.GcsConnector) {
            return this.read((StorageConnector.GcsConnector)connector, dataSource, dataFormat, options);
        }
        if (connector instanceof StorageConnector.BigqueryConnector) {
            return this.read((StorageConnector.BigqueryConnector)connector, dataSource, options);
        }
        if (connector instanceof StorageConnector.RdsConnector) {
            return this.read((StorageConnector.RdsConnector)connector, dataSource, options);
        }
        if (connector instanceof StorageConnector.KafkaConnector) {
            throw new NotSupportedException("Reading a Kafka Stream into a static Spark Dataframe is not supported.");
        }
        throw new FeatureStoreException("Unknown type of StorageConnector.");
    }

    public Dataset<Row> readStream(StorageConnector.KafkaConnector connector, String topic, boolean topicPattern, String messageFormat, String schema, Map<String, String> options, boolean includeMetadata) throws FeatureStoreException, IOException {
        if (!Arrays.asList("avro", "json", null).contains(messageFormat.toLowerCase())) {
            throw new IllegalArgumentException("Can only read JSON and AVRO encoded records from Kafka.");
        }
        if (topicPattern) {
            options.put("subscribePattern", topic);
        } else {
            options.put("subscribe", topic);
        }
        return SparkEngine.getInstance().readStream((StorageConnector)connector, "kafka", messageFormat.toLowerCase(), schema, options, includeMetadata);
    }
}

