package com.logicalclocks.hsfs.spark.util;

import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.spark.engine.SparkEngine;
import com.logicalclocks.hsfs.util.Constants;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.logging.Logger;
import javax.ws.rs.NotSupportedException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/util/StorageConnectorUtils.class */
public class StorageConnectorUtils {
    Logger logger = Logger.getLogger(StorageConnectorUtils.class.getName());

    public Dataset<Row> read(StorageConnector.HopsFsConnector hopsFsConnector, String str, Map<String, String> map, String str2) throws FeatureStoreException, IOException {
        return SparkEngine.getInstance().read(hopsFsConnector, str, map, str2);
    }

    public Dataset<Row> read(StorageConnector.S3Connector s3Connector, String str, Map<String, String> map, String str2) throws FeatureStoreException, IOException {
        s3Connector.update();
        Map<String, String> sparkOptions = s3Connector.sparkOptions();
        if (map != null && !map.isEmpty()) {
            sparkOptions.putAll(map);
        }
        if (str2 != null && !str2.startsWith(Constants.S3_SCHEME)) {
            str2 = s3Connector.getPath(str2);
            this.logger.info(String.format("Prepending default bucket specified on connector, final path: %s", str2));
        }
        return SparkEngine.getInstance().read(s3Connector, str, sparkOptions, str2);
    }

    public Dataset<Row> read(StorageConnector.RedshiftConnector redshiftConnector, String str) throws FeatureStoreException, IOException {
        redshiftConnector.update();
        Map<String, String> sparkOptions = redshiftConnector.sparkOptions();
        if (!Strings.isNullOrEmpty(str)) {
            sparkOptions.put("query", str);
        }
        return SparkEngine.getInstance().read(redshiftConnector, Constants.JDBC_FORMAT, sparkOptions, null);
    }

    public Dataset<Row> read(StorageConnector.AdlsConnector adlsConnector, String str, Map<String, String> map, String str2) throws FeatureStoreException, IOException {
        if (str2 != null && (!str2.startsWith("abfss://") || !str2.startsWith("adl://"))) {
            str2 = adlsConnector.getPath(str2);
            this.logger.info(String.format("Using default container specified on connector, final path: %s", str2));
        }
        return SparkEngine.getInstance().read(adlsConnector, str, map, str2);
    }

    public Dataset<Row> read(StorageConnector.SnowflakeConnector snowflakeConnector, String str) throws FeatureStoreException, IOException {
        Map<String, String> sparkOptions = snowflakeConnector.sparkOptions();
        if (!Strings.isNullOrEmpty(str)) {
            sparkOptions.remove("dbtable");
            sparkOptions.put("query", str);
        }
        return SparkEngine.getInstance().read(snowflakeConnector, Constants.SNOWFLAKE_FORMAT, sparkOptions, null);
    }

    public Dataset<Row> read(StorageConnector.JdbcConnector jdbcConnector, String str) throws FeatureStoreException, IOException {
        jdbcConnector.update();
        Map<String, String> sparkOptions = jdbcConnector.sparkOptions();
        if (!Strings.isNullOrEmpty(str)) {
            sparkOptions.put("query", str);
        }
        return SparkEngine.getInstance().read(jdbcConnector.refetch(), Constants.JDBC_FORMAT, sparkOptions, null);
    }

    public Dataset<Row> read(StorageConnector.GcsConnector gcsConnector, String str, Map<String, String> map, String str2) throws FeatureStoreException, IOException {
        if (str2 != null && !str2.startsWith("gs://")) {
            str2 = gcsConnector.getPath(str2);
            this.logger.info(String.format("Prepending default bucket specified on connector, final path: %s", str2));
        }
        return SparkEngine.getInstance().read(gcsConnector, str, map, str2);
    }

    public Dataset<Row> read(StorageConnector.BigqueryConnector bigqueryConnector, String str, Map<String, String> map, String str2) throws FeatureStoreException, IOException {
        String str3;
        Map<String, String> sparkOptions = bigqueryConnector.sparkOptions();
        map.put(Constants.BIGQ_CREDENTIALS, Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(SparkEngine.getInstance().addFile(bigqueryConnector.getKeyPath()), new String[0]))));
        if (map != null && !map.isEmpty()) {
            sparkOptions.putAll(map);
        }
        if (!Strings.isNullOrEmpty(str)) {
            str3 = str;
        } else if (!Strings.isNullOrEmpty(bigqueryConnector.getQueryTable())) {
            str3 = bigqueryConnector.getQueryTable();
        } else {
            if (Strings.isNullOrEmpty(str2)) {
                throw new IllegalArgumentException("Either query should be provided or Query Project,Dataset and Table should be set");
            }
            str3 = str2;
        }
        return SparkEngine.getInstance().read(bigqueryConnector, Constants.BIGQUERY_FORMAT, sparkOptions, str3);
    }

    public Dataset<Row> read(StorageConnector storageConnector, String str, String str2, Map<String, String> map, String str3) throws FeatureStoreException, IOException {
        if (storageConnector instanceof StorageConnector.HopsFsConnector) {
            return read((StorageConnector.HopsFsConnector) storageConnector, str2, map, str3);
        }
        if (storageConnector instanceof StorageConnector.S3Connector) {
            return read((StorageConnector.S3Connector) storageConnector, str2, map, str3);
        }
        if (storageConnector instanceof StorageConnector.RedshiftConnector) {
            return read((StorageConnector.RedshiftConnector) storageConnector, str);
        }
        if (storageConnector instanceof StorageConnector.AdlsConnector) {
            return read((StorageConnector.AdlsConnector) storageConnector, str2, map, str3);
        }
        if (storageConnector instanceof StorageConnector.SnowflakeConnector) {
            return read((StorageConnector.SnowflakeConnector) storageConnector, str);
        }
        if (storageConnector instanceof StorageConnector.JdbcConnector) {
            return read((StorageConnector.JdbcConnector) storageConnector, str);
        }
        if (storageConnector instanceof StorageConnector.GcsConnector) {
            return read((StorageConnector.GcsConnector) storageConnector, str2, map, str3);
        }
        if (storageConnector instanceof StorageConnector.BigqueryConnector) {
            return read((StorageConnector.BigqueryConnector) storageConnector, str, map, str3);
        }
        if (storageConnector instanceof StorageConnector.KafkaConnector) {
            throw new NotSupportedException("Reading a Kafka Stream into a static Spark Dataframe is not supported.");
        }
        throw new FeatureStoreException("Unknown type of StorageConnector.");
    }

    public Dataset<Row> readStream(StorageConnector.KafkaConnector kafkaConnector, String str, boolean z, String str2, String str3, Map<String, String> map, boolean z2) throws FeatureStoreException, IOException {
        if (!Arrays.asList("avro", "json", null).contains(str2.toLowerCase())) {
            throw new IllegalArgumentException("Can only read JSON and AVRO encoded records from Kafka.");
        }
        if (z) {
            map.put("subscribePattern", str);
        } else {
            map.put("subscribe", str);
        }
        return SparkEngine.getInstance().readStream(kafkaConnector, "kafka", str2.toLowerCase(), str3, map, z2);
    }
}
