package com.logicalclocks.hsfs.spark.engine;

import com.amazon.deequ.profiles.ColumnProfilerRunBuilder;
import com.amazon.deequ.profiles.ColumnProfilerRunner;
import com.amazon.deequ.profiles.ColumnProfiles;
import com.damnhandy.uri.template.UriTemplate;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.FeatureType;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDatasetFeature;
import com.logicalclocks.hsfs.constructor.FeatureGroupAlias;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
import com.logicalclocks.hsfs.metadata.KafkaApi;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.spark.ExternalFeatureGroup;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
import com.logicalclocks.hsfs.spark.TrainingDataset;
import com.logicalclocks.hsfs.spark.constructor.Query;
import com.logicalclocks.hsfs.spark.engine.hudi.HudiEngine;
import com.logicalclocks.hsfs.spark.util.StorageConnectorUtils;
import com.logicalclocks.hsfs.util.Constants;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkFiles;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
import org.json.JSONObject;
import scala.collection.Iterator;
import scala.collection.JavaConverters;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/engine/SparkEngine.class */
public class SparkEngine {
    private static SparkEngine INSTANCE = null;
    private static Pattern arrayPattern = Pattern.compile("^array<(.*)>$");
    private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();
    private FeatureGroupUtils utils = new FeatureGroupUtils();
    private HudiEngine hudiEngine = new HudiEngine();
    private KafkaApi kafkaApi = new KafkaApi();
    private SparkSession sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate();

    public static synchronized SparkEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SparkEngine();
        }
        return INSTANCE;
    }

    public static void setInstance(SparkEngine sparkEngine) {
        INSTANCE = sparkEngine;
    }

    private SparkEngine() {
        this.sparkSession.conf().set("hive.exec.dynamic.partition", "true");
        this.sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
        this.sparkSession.conf().set("spark.sql.hive.convertMetastoreParquet", "false");
        this.sparkSession.conf().set("spark.sql.session.timeZone", "UTC");
    }

    public void validateSparkConfiguration() throws FeatureStoreException {
        HashMap hashMap = new HashMap();
        hashMap.put("spark.hadoop.hops.ssl.trustore.name", null);
        hashMap.put("spark.hadoop.hops.rpc.socket.factory.class.default", "io.hops.hadoop.shaded.org.apache.hadoop.net.HopsSSLSocketFactory");
        hashMap.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        hashMap.put("spark.hadoop.hops.ssl.hostname.verifier", "ALLOW_ALL");
        hashMap.put("spark.hadoop.hops.ssl.keystore.name", null);
        hashMap.put("spark.hadoop.fs.hopsfs.impl", "io.hops.hopsfs.client.HopsFileSystem");
        hashMap.put("spark.hadoop.hops.ssl.keystores.passwd.name", null);
        hashMap.put("spark.hadoop.hops.ipc.server.ssl.enabled", "true");
        hashMap.put("spark.sql.hive.metastore.jars", null);
        hashMap.put("spark.hadoop.client.rpc.ssl.enabled.protocol", "TLSv1.2");
        hashMap.put("spark.hadoop.hive.metastore.uris", null);
        for (Map.Entry entry : hashMap.entrySet()) {
            if (!this.sparkSession.conf().contains((String) entry.getKey()) || (entry.getValue() != null && !this.sparkSession.conf().get((String) entry.getKey(), (String) null).equals(entry.getValue()))) {
                throw new FeatureStoreException("Spark is misconfigured for communication with Hopsworks, missing or invalid property: " + ((String) entry.getKey()));
            }
        }
    }

    public String getTrustStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.trustore.name");
    }

    public String getKeyStorePath() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystore.name");
    }

    public String getCertKey() {
        return this.sparkSession.conf().get("spark.hadoop.hops.ssl.keystores.passwd.name");
    }

    public Dataset<Row> sql(String str) {
        try {
            return this.sparkSession.sql(str);
        } catch (Exception e) {
            if (e.getMessage().contains("Permission denied")) {
                Matcher matcher = Pattern.compile("inode=\"/apps/hive/warehouse/(.*)?_featurestore\\.db\"").matcher(e.getMessage());
                if (matcher.find()) {
                    String group = matcher.group(1);
                    throw new RuntimeException(String.format("Cannot access feature store '%s'. It is possible to request access from data owners of '%s'.", group, group));
                }
            }
            throw e;
        }
    }

    public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup externalFeatureGroup, String str) throws FeatureStoreException, IOException {
        Dataset<Row> read = this.storageConnectorUtils.read(externalFeatureGroup.getStorageConnector(), externalFeatureGroup.getQuery(), externalFeatureGroup.getDataFormat() != null ? externalFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(externalFeatureGroup), externalFeatureGroup.getStorageConnector().getPath(externalFeatureGroup.getPath()));
        if (!Strings.isNullOrEmpty(externalFeatureGroup.getLocation())) {
            this.sparkSession.sparkContext().textFile(externalFeatureGroup.getLocation(), 0).collect();
        }
        read.createOrReplaceTempView(str);
        return read;
    }

    public static List<Dataset<Row>> splitLabels(Dataset<Row> dataset, List<String> list) {
        ArrayList newArrayList = Lists.newArrayList();
        if (list == null || list.isEmpty()) {
            newArrayList.add(dataset);
            newArrayList.add(null);
        } else {
            Column[] columnArr = (Column[]) list.stream().map(str -> {
                return functions.col(str).alias(str.toLowerCase());
            }).toArray(i -> {
                return new Column[i];
            });
            newArrayList.add(dataset.drop((String[]) list.stream().toArray(i2 -> {
                return new String[i2];
            })));
            newArrayList.add(dataset.select(columnArr));
        }
        return newArrayList;
    }

    private Map<String, String> getOnDemandOptions(ExternalFeatureGroup externalFeatureGroup) {
        return externalFeatureGroup.getOptions() == null ? new HashMap() : (Map) externalFeatureGroup.getOptions().stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map<String, String> map) throws FeatureStoreException {
        Map<String, String> map2 = this.hudiEngine.setupHudiReadOpts(featureGroupAlias.getLeftFeatureGroupStartTimestamp(), featureGroupAlias.getLeftFeatureGroupEndTimestamp(), map);
        this.sparkSession.read().format(HudiEngine.HUDI_SPARK_FORMAT).options(map2).load(featureGroupAlias.getFeatureGroup().getLocation()).createOrReplaceTempView(featureGroupAlias.getAlias());
        this.hudiEngine.reconcileHudiSchema(this.sparkSession, featureGroupAlias, map2);
    }

    public Dataset<Row>[] write(TrainingDataset trainingDataset, Query query, Map<String, String> map, Map<String, String> map2, SaveMode saveMode) throws FeatureStoreException, IOException {
        setupConnectorHadoopConf(trainingDataset.getStorageConnector());
        if (trainingDataset.getSplits() == null || trainingDataset.getSplits().isEmpty()) {
            Dataset<Row> read = query.read();
            String path = new Path(trainingDataset.getLocation(), trainingDataset.getName()).toString();
            if (trainingDataset.getCoalesce().booleanValue()) {
                read = read.coalesce(1);
            }
            writeSingle(read, trainingDataset.getDataFormat(), map2, saveMode, path);
            return new Dataset[]{read};
        }
        Dataset<Row>[] splitDataset = splitDataset(trainingDataset, query, map);
        if (trainingDataset.getCoalesce().booleanValue()) {
            for (int i = 0; i < splitDataset.length; i++) {
                splitDataset[i] = splitDataset[i].coalesce(1);
            }
        }
        writeSplits(splitDataset, trainingDataset.getDataFormat(), map2, saveMode, trainingDataset.getLocation(), trainingDataset.getSplits());
        return splitDataset;
    }

    public Dataset<Row>[] splitDataset(TrainingDataset trainingDataset, Query query, Map<String, String> map) throws FeatureStoreException, IOException {
        if (!Split.SplitType.TIME_SERIES_SPLIT.equals(trainingDataset.getSplits().get(0).getSplitType())) {
            return randomSplit(trainingDataset, query, map);
        }
        String eventTime = query.getLeftFeatureGroup().getEventTime();
        if (!query.getLeftFeatures().stream().noneMatch(feature -> {
            return feature.getName().equals(eventTime);
        })) {
            return timeSeriesSplit(trainingDataset, query, map, false);
        }
        query.appendFeature(query.getLeftFeatureGroup().getFeature(eventTime));
        return timeSeriesSplit(trainingDataset, query, map, true);
    }

    private Dataset<Row>[] timeSeriesSplit(TrainingDataset trainingDataset, Query query, Map<String, String> map, Boolean bool) throws FeatureStoreException, IOException {
        Dataset<Row> read = query.read(false, map);
        List<Split> splits = trainingDataset.getSplits();
        Dataset<Row>[] datasetArr = new Dataset[splits.size()];
        read.persist();
        int i = 0;
        for (Split split : splits) {
            if (read.count() > 0) {
                String eventTime = query.getLeftFeatureGroup().getEventTime();
                String type = query.getLeftFeatureGroup().getFeature(eventTime).getType();
                if (FeatureType.BIGINT.name().equalsIgnoreCase(type)) {
                    String str = eventTime + "_hopsworks_tmp";
                    this.sparkSession.sqlContext().udf().register("checkEpochUDF", l -> {
                        return Long.toString(l.longValue()).length() > 10 ? Long.valueOf(Long.valueOf(l.longValue() / 1000).longValue()) : l;
                    }, DataTypes.LongType);
                    read = read.withColumn(str, functions.callUDF("checkEpochUDF", new Column[]{read.col(eventTime)}));
                    datasetArr[i] = read.filter(String.format("%d/1000 <= `%s` and `%s` < %d/1000", Long.valueOf(split.getStartTime().getTime()), str, str, Long.valueOf(split.getEndTime().getTime()))).drop(str);
                } else {
                    if (!FeatureType.DATE.name().equalsIgnoreCase(type) && !FeatureType.TIMESTAMP.name().equalsIgnoreCase(type)) {
                        throw new FeatureStoreException("Invalid event time type");
                    }
                    datasetArr[i] = read.filter(String.format("%d/1000 <= unix_timestamp(`%s`) and unix_timestamp(`%s`) < %d/1000", Long.valueOf(split.getStartTime().getTime()), eventTime, eventTime, Long.valueOf(split.getEndTime().getTime())));
                }
            } else {
                datasetArr[i] = read;
            }
            i++;
        }
        return datasetArr;
    }

    private Dataset<Row>[] randomSplit(TrainingDataset trainingDataset, Query query, Map<String, String> map) throws FeatureStoreException, IOException {
        Dataset<Row> read = query.read(false, map);
        List list = (List) trainingDataset.getSplits().stream().map((v0) -> {
            return v0.getPercentage();
        }).collect(Collectors.toList());
        return trainingDataset.getSeed() != null ? read.randomSplit(list.stream().mapToDouble((v0) -> {
            return v0.doubleValue();
        }).toArray(), trainingDataset.getSeed().longValue()) : read.randomSplit(list.stream().mapToDouble((v0) -> {
            return v0.doubleValue();
        }).toArray());
    }

    public Map<String, String> getWriteOptions(Map<String, String> map, DataFormat dataFormat) {
        HashMap hashMap = new HashMap();
        switch (dataFormat) {
            case CSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, UriTemplate.DEFAULT_SEPARATOR);
                break;
            case TSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, "\t");
                break;
            case TFRECORDS:
            case TFRECORD:
                hashMap.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example");
                break;
        }
        if (map != null && !map.isEmpty()) {
            hashMap.putAll(map);
        }
        return hashMap;
    }

    public Map<String, String> getReadOptions(Map<String, String> map, DataFormat dataFormat) {
        HashMap hashMap = new HashMap();
        switch (dataFormat) {
            case CSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, UriTemplate.DEFAULT_SEPARATOR);
                hashMap.put(Constants.INFER_SCHEMA, "true");
                break;
            case TSV:
                hashMap.put(Constants.HEADER, "true");
                hashMap.put(Constants.DELIMITER, "\t");
                hashMap.put(Constants.INFER_SCHEMA, "true");
                break;
            case TFRECORDS:
            case TFRECORD:
                hashMap.put(Constants.TF_CONNECTOR_RECORD_TYPE, "Example");
                break;
        }
        if (map != null && !map.isEmpty()) {
            hashMap.putAll(map);
        }
        return hashMap;
    }

    private void writeSplits(Dataset<Row>[] datasetArr, DataFormat dataFormat, Map<String, String> map, SaveMode saveMode, String str, List<Split> list) {
        for (int i = 0; i < datasetArr.length; i++) {
            writeSingle(datasetArr[i], dataFormat, map, saveMode, new Path(str, list.get(i).getName()).toString());
        }
    }

    private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat, Map<String, String> map, SaveMode saveMode, String str) {
        dataset.write().format(dataFormat.toString()).options(map).mode(saveMode).save(sparkPath(str));
    }

    public Dataset<Row> read(StorageConnector storageConnector, String str, Map<String, String> map, String str2) throws FeatureStoreException, IOException {
        setupConnectorHadoopConf(storageConnector);
        String sparkPath = sparkPath(str2 != null ? new Path(str2, "**").toString() : null);
        DataFrameReader options = getInstance().getSparkSession().read().format(str).options(map);
        return !Strings.isNullOrEmpty(sparkPath) ? str.equals(Constants.BIGQUERY_FORMAT) ? options.load(str2) : options.load(sparkPath(sparkPath)) : options.load();
    }

    public void writeOnlineDataframe(FeatureGroupBase featureGroupBase, Dataset<Row> dataset, String str, Map<String, String> map) throws FeatureStoreException, IOException {
        onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)).withColumn("headers", functions.array(new Column[]{functions.struct(new Column[]{functions.lit("version").as("key"), functions.lit(String.valueOf(featureGroupBase.getSubject().getVersion()).getBytes(StandardCharsets.UTF_8)).as("value")})})).write().format("kafka").options(map).option("topic", str).save();
    }

    public <S> StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase, Dataset<Row> dataset, String str, String str2, boolean z, Long l, String str3, Map<String, String> map) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException {
        StreamingQuery start = onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)).withColumn("headers", functions.array(new Column[]{functions.struct(new Column[]{functions.lit("version").as("key"), functions.lit(String.valueOf(featureGroupBase.getSubject().getVersion()).getBytes(StandardCharsets.UTF_8)).as("value")})})).writeStream().format("kafka").outputMode(str2).option("checkpointLocation", str3 == null ? checkpointDirPath(str, featureGroupBase.getOnlineTopicName()) : str3).options(map).option("topic", featureGroupBase.getOnlineTopicName()).start();
        if (z) {
            start.awaitTermination(l.longValue());
        }
        return start;
    }

    public Dataset<Row> encodeComplexFeatures(FeatureGroupBase featureGroupBase, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        ArrayList arrayList = new ArrayList();
        for (Schema.Field field : featureGroupBase.getDeserializedAvroSchema().getFields()) {
            if (featureGroupBase.getComplexFeatures().contains(field.name())) {
                arrayList.add(org.apache.spark.sql.avro.functions.to_avro(functions.col(field.name()), featureGroupBase.getFeatureAvroSchema(field.name())).alias(field.name()));
            } else {
                arrayList.add(functions.col(field.name()));
            }
        }
        return dataset.select((Column[]) arrayList.stream().toArray(i -> {
            return new Column[i];
        }));
    }

    private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroupBase featureGroupBase, Dataset<Row> dataset) throws FeatureStoreException, IOException {
        return dataset.select(new Column[]{org.apache.spark.sql.avro.functions.to_avro(functions.concat((Column[]) featureGroupBase.getPrimaryKeys().stream().map(str -> {
            return functions.col(str).cast("string");
        }).toArray(i -> {
            return new Column[i];
        }))).alias("key"), org.apache.spark.sql.avro.functions.to_avro(functions.struct((Column[]) featureGroupBase.getDeserializedAvroSchema().getFields().stream().map(field -> {
            return functions.col(field.name());
        }).toArray(i2 -> {
            return new Column[i2];
        })), featureGroupBase.getEncodedAvroSchema()).alias("value")});
    }

    public void writeEmptyDataframe(FeatureGroupBase featureGroupBase) throws IOException, FeatureStoreException, ParseException {
        writeOfflineDataframe(featureGroupBase, this.sparkSession.table(this.utils.getTableName(featureGroupBase)).limit(0), HudiOperationType.UPSERT, new HashMap(), null);
    }

    public void writeOfflineDataframe(FeatureGroupBase featureGroupBase, Dataset<Row> dataset, HudiOperationType hudiOperationType, Map<String, String> map, Integer num) throws IOException, FeatureStoreException, ParseException {
        if (featureGroupBase.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            this.hudiEngine.saveHudiFeatureGroup(this.sparkSession, featureGroupBase, dataset, hudiOperationType, map, num);
        } else {
            writeSparkDataset(featureGroupBase, dataset, map);
        }
    }

    private void writeSparkDataset(FeatureGroupBase featureGroupBase, Dataset<Row> dataset, Map<String, String> map) {
        dataset.write().format(Constants.HIVE_FORMAT).mode(SaveMode.Append).options(map == null ? new HashMap<>() : map).partitionBy(this.utils.getPartitionColumns(featureGroupBase)).saveAsTable(this.utils.getTableName(featureGroupBase));
    }

    public String profile(Dataset<Row> dataset, List<String> list, Boolean bool, Boolean bool2, Boolean bool3) {
        if (bool == null) {
            bool = true;
        }
        if (bool2 == null) {
            bool2 = true;
        }
        if (bool3 == null) {
            bool3 = true;
        }
        ColumnProfilerRunBuilder withExactUniqueness = new ColumnProfilerRunner().onData(dataset).withCorrelation(bool.booleanValue(), 100).withHistogram(bool2.booleanValue(), 20).withExactUniqueness(bool3.booleanValue());
        if (list != null && !list.isEmpty()) {
            withExactUniqueness.restrictToColumns(((Iterator) JavaConverters.asScalaIteratorConverter(list.iterator()).asScala()).toSeq());
        }
        ColumnProfiles run = withExactUniqueness.run();
        return ColumnProfiles.toJson(run.profiles().values().toSeq(), run.numRecords());
    }

    public String profile(Dataset<Row> dataset, List<String> list, Boolean bool, Boolean bool2) {
        return profile(dataset, list, bool, bool2, true);
    }

    public String profile(Dataset<Row> dataset, List<String> list) {
        return profile(dataset, list, true, true);
    }

    public String profile(Dataset<Row> dataset, boolean z, boolean z2) {
        return profile(dataset, null, Boolean.valueOf(z), Boolean.valueOf(z2));
    }

    public String profile(Dataset<Row> dataset) {
        return profile(dataset, null, true, true);
    }

    public void setupConnectorHadoopConf(StorageConnector storageConnector) throws IOException {
        if (storageConnector == null) {
            return;
        }
        switch (storageConnector.getStorageConnectorType()) {
            case S3:
                setupS3ConnectorHadoopConf((StorageConnector.S3Connector) storageConnector);
                return;
            case ADLS:
                setupAdlsConnectorHadoopConf((StorageConnector.AdlsConnector) storageConnector);
                return;
            case GCS:
                setupGcsConnectorHadoopConf((StorageConnector.GcsConnector) storageConnector);
                return;
            default:
                return;
        }
    }

    public static String sparkPath(String str) {
        if (str == null) {
            return null;
        }
        return str.startsWith(Constants.S3_SCHEME) ? str.replaceFirst(Constants.S3_SCHEME, Constants.S3_SPARK_SCHEME) : str;
    }

    private void setupS3ConnectorHadoopConf(StorageConnector.S3Connector s3Connector) {
        if (!Strings.isNullOrEmpty(s3Connector.getAccessKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_ACCESS_KEY_ENV, s3Connector.getAccessKey());
        }
        if (!Strings.isNullOrEmpty(s3Connector.getSecretKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_SECRET_KEY_ENV, s3Connector.getSecretKey());
        }
        if (!Strings.isNullOrEmpty(s3Connector.getServerEncryptionAlgorithm())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-algorithm", s3Connector.getServerEncryptionAlgorithm());
        }
        if (!Strings.isNullOrEmpty(s3Connector.getServerEncryptionKey())) {
            this.sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.server-side-encryption-key", s3Connector.getServerEncryptionKey());
        }
        if (Strings.isNullOrEmpty(s3Connector.getSessionToken())) {
            return;
        }
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_CREDENTIAL_PROVIDER_ENV, Constants.S3_TEMPORARY_CREDENTIAL_PROVIDER);
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.S3_SESSION_KEY_ENV, s3Connector.getSessionToken());
    }

    private void setupAdlsConnectorHadoopConf(StorageConnector.AdlsConnector adlsConnector) {
        for (Option option : adlsConnector.getSparkOptions()) {
            this.sparkSession.sparkContext().hadoopConfiguration().set(option.getName(), option.getValue());
        }
    }

    public void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map<String, String> map) throws Exception {
        this.hudiEngine.streamToHoodieTable(this.sparkSession, streamFeatureGroup, getKafkaConfig(streamFeatureGroup, map));
    }

    public List<Feature> parseFeatureGroupSchema(Dataset<Row> dataset, TimeTravelFormat timeTravelFormat) throws FeatureStoreException {
        String catalogString;
        ArrayList arrayList = new ArrayList();
        Boolean valueOf = Boolean.valueOf(timeTravelFormat == TimeTravelFormat.HUDI);
        for (StructField structField : dataset.schema().fields()) {
            if (!valueOf.booleanValue()) {
                catalogString = structField.dataType().catalogString();
            } else if (structField.dataType() instanceof ByteType) {
                catalogString = "int";
            } else if (structField.dataType() instanceof ShortType) {
                catalogString = "int";
            } else {
                if (!(structField.dataType() instanceof BooleanType) && !(structField.dataType() instanceof IntegerType) && !(structField.dataType() instanceof LongType) && !(structField.dataType() instanceof FloatType) && !(structField.dataType() instanceof DoubleType) && !(structField.dataType() instanceof DecimalType) && !(structField.dataType() instanceof TimestampType) && !(structField.dataType() instanceof DateType) && !(structField.dataType() instanceof StringType) && !(structField.dataType() instanceof ArrayType) && !(structField.dataType() instanceof StructType) && !(structField.dataType() instanceof BinaryType)) {
                    throw new FeatureStoreException("Feature '" + structField.name().toLowerCase() + "': spark type " + structField.dataType().catalogString() + " not supported.");
                }
                catalogString = structField.dataType().catalogString();
            }
            Feature feature = new Feature(structField.name().toLowerCase(), catalogString, false, false);
            if (structField.metadata().contains("description")) {
                feature.setDescription(structField.metadata().getString("description"));
            }
            arrayList.add(feature);
        }
        return arrayList;
    }

    public Dataset<Row> sanitizeFeatureNames(Dataset<Row> dataset) {
        return dataset.select((Column[]) Arrays.asList(dataset.columns()).stream().map(str -> {
            return functions.col(str).alias(str.toLowerCase());
        }).toArray(i -> {
            return new Column[i];
        }));
    }

    public Dataset<Row> convertToDefaultDataframe(Dataset<Row> dataset) {
        Dataset<Row> select = dataset.select((Column[]) Arrays.asList(dataset.columns()).stream().map(str -> {
            return functions.col(str).alias(str.toLowerCase());
        }).toArray(i -> {
            return new Column[i];
        }));
        if (dataset.isStreaming()) {
            return select;
        }
        return select.sparkSession().createDataFrame(select.rdd(), new StructType((StructField[]) JavaConverters.asJavaCollection(select.schema().toSeq()).stream().map(structField -> {
            return new StructField(structField.name(), structField.dataType(), true, structField.metadata());
        }).toArray(i2 -> {
            return new StructField[i2];
        })));
    }

    public Dataset<Row> castColumnType(Dataset<Row> dataset, List<TrainingDatasetFeature> list) throws FeatureStoreException {
        for (TrainingDatasetFeature trainingDatasetFeature : list) {
            dataset = dataset.withColumn(trainingDatasetFeature.getName(), dataset.col(trainingDatasetFeature.getName()).cast(convertColumnType(trainingDatasetFeature.getType())));
        }
        return dataset;
    }

    private DataType convertColumnType(String str) throws FeatureStoreException {
        Matcher matcher = arrayPattern.matcher(str);
        return matcher.matches() ? DataTypes.createArrayType(convertColumnType(matcher.group(1))) : str.contains("struct<label:string,index:int>") ? DataTypes.createStructType(new StructField[]{new StructField("label", DataTypes.StringType, true, Metadata.empty()), new StructField("index", DataTypes.IntegerType, true, Metadata.empty())}) : convertBasicType(str);
    }

    private DataType convertBasicType(String str) throws FeatureStoreException {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("string", DataTypes.StringType);
        newHashMap.put("bigint", DataTypes.LongType);
        newHashMap.put("int", DataTypes.IntegerType);
        newHashMap.put("smallint", DataTypes.ShortType);
        newHashMap.put("tinyint", DataTypes.ByteType);
        newHashMap.put("float", DataTypes.FloatType);
        newHashMap.put("double", DataTypes.DoubleType);
        newHashMap.put("timestamp", DataTypes.TimestampType);
        newHashMap.put("boolean", DataTypes.BooleanType);
        newHashMap.put("date", DataTypes.DateType);
        newHashMap.put(HttpHeaders.Values.BINARY, DataTypes.BinaryType);
        newHashMap.put("decimal", DataTypes.createDecimalType());
        if (newHashMap.containsKey(str)) {
            return (DataType) newHashMap.get(str);
        }
        throw new FeatureStoreException(String.format("Pyarrow type %s cannot be converted to a spark type.", str));
    }

    public String addFile(String str) {
        if (!str.startsWith("file://")) {
            str = "hdfs://" + str;
        }
        this.sparkSession.sparkContext().addFile(str);
        return SparkFiles.get(new Path(str).getName());
    }

    public Dataset<Row> readStream(StorageConnector storageConnector, String str, String str2, String str3, Map<String, String> map, boolean z) throws FeatureStoreException, IOException {
        DataStreamReader options = this.sparkSession.readStream().format(str).options(storageConnector.sparkOptions()).options(map);
        if (storageConnector instanceof StorageConnector.KafkaConnector) {
            return readStreamKafka(options, str2, str3, z);
        }
        throw new FeatureStoreException("Connector does not support reading data into stream.");
    }

    private Dataset<Row> readStreamKafka(DataStreamReader dataStreamReader, String str, String str2, boolean z) throws SchemaParseException, FeatureStoreException {
        Column[] columnArr = (Column[]) Arrays.asList(functions.col("key"), functions.col("topic"), functions.col("partition"), functions.col("offset"), functions.col("timestamp"), functions.col("timestampType"), functions.col("value.*")).toArray(new Column[7]);
        if (str.equals("avro") && !Strings.isNullOrEmpty(str2)) {
            new Schema.Parser().parse(str2);
            Dataset load = dataStreamReader.load();
            return z ? load.withColumn("value", org.apache.spark.sql.avro.functions.from_avro(load.col("value"), str2)).select(columnArr) : load.withColumn("value", org.apache.spark.sql.avro.functions.from_avro(load.col("value"), str2)).select(new Column[]{functions.col("value.*")});
        }
        if (!str.equals("json") || Strings.isNullOrEmpty(str2)) {
            return z ? dataStreamReader.load() : dataStreamReader.load().select("key", new String[]{"value"});
        }
        Dataset load2 = dataStreamReader.load();
        return z ? load2.withColumn("value", functions.from_json(load2.col("value").cast("string"), str2, new HashMap())).select(columnArr) : load2.withColumn("value", functions.from_json(load2.col("value").cast("string"), str2, new HashMap())).select(new Column[]{functions.col("value.*")});
    }

    public Dataset<Row> objectToDataset(Object obj) {
        return (Dataset) obj;
    }

    private void setupGcsConnectorHadoopConf(StorageConnector.GcsConnector gcsConnector) throws IOException {
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_GCS_FS_KEY, Constants.PROPERTY_GCS_FS_VALUE);
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_GCS_ACCOUNT_ENABLE, "true");
        JSONObject jSONObject = new JSONObject(String.join("\n", Files.readAllLines(Paths.get(addFile(gcsConnector.getKeyPath()), new String[0]), StandardCharsets.UTF_8)));
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_GCS_ACCOUNT_EMAIL, jSONObject.getString("client_email"));
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_GCS_ACCOUNT_KEY_ID, jSONObject.getString("private_key_id"));
        this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_GCS_ACCOUNT_KEY, jSONObject.getString("private_key"));
        if (Strings.isNullOrEmpty(gcsConnector.getAlgorithm())) {
            this.sparkSession.sparkContext().hadoopConfiguration().unset(Constants.PROPERTY_ALGORITHM);
            this.sparkSession.sparkContext().hadoopConfiguration().unset(Constants.PROPERTY_ENCRYPTION_KEY);
            this.sparkSession.sparkContext().hadoopConfiguration().unset(Constants.PROPERTY_ENCRYPTION_HASH);
        } else {
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_ALGORITHM, gcsConnector.getAlgorithm());
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_ENCRYPTION_KEY, gcsConnector.getEncryptionKey());
            this.sparkSession.sparkContext().hadoopConfiguration().set(Constants.PROPERTY_ENCRYPTION_HASH, gcsConnector.getEncryptionKeyHash());
        }
    }

    public <S> S createEmptyDataFrame(S s) {
        return (S) this.sparkSession.sqlContext().createDataFrame(new ArrayList(), ((Dataset) s).schema());
    }

    public String constructCheckpointPath(FeatureGroupBase featureGroupBase, String str, String str2) throws FeatureStoreException, IOException {
        if (Strings.isNullOrEmpty(str)) {
            str = str2 + featureGroupBase.getOnlineTopicName() + "_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
        }
        return "/Projects/" + HopsworksClient.getInstance().getProject().getProjectName() + "/Resources/" + str + "-checkpoint";
    }

    public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroupBase, Map<String, String> map) throws FeatureStoreException, IOException {
        HashMap hashMap = new HashMap();
        if (map != null) {
            hashMap.putAll(map);
        }
        HopsworksHttpClient hopsworksHttpClient = HopsworksClient.getInstance().getHopsworksHttpClient();
        hashMap.put(Constants.KAFKA_BOOTSTRAP_SERVERS, this.kafkaApi.getBrokerEndpoints(featureGroupBase.getFeatureStore()).stream().map(str -> {
            return str.replaceAll("INTERNAL://", "");
        }).collect(Collectors.joining(UriTemplate.DEFAULT_SEPARATOR)));
        hashMap.put(Constants.KAFKA_SECURITY_PROTOCOL, "SSL");
        hashMap.put(Constants.KAFKA_SSL_TRUSTSTORE_LOCATION, hopsworksHttpClient.getTrustStorePath());
        hashMap.put(Constants.KAFKA_SSL_TRUSTSTORE_PASSWORD, hopsworksHttpClient.getCertKey());
        hashMap.put(Constants.KAFKA_SSL_KEYSTORE_LOCATION, hopsworksHttpClient.getKeyStorePath());
        hashMap.put(Constants.KAFKA_SSL_KEYSTORE_PASSWORD, hopsworksHttpClient.getCertKey());
        hashMap.put(Constants.KAFKA_SSL_KEY_PASSWORD, hopsworksHttpClient.getCertKey());
        hashMap.put(Constants.KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, "");
        return hashMap;
    }

    public String checkpointDirPath(String str, String str2) throws FeatureStoreException {
        if (Strings.isNullOrEmpty(str)) {
            str = "insert_stream_" + str2;
        }
        return "/Projects/" + HopsworksClient.getInstance().getProject().getProjectName() + "/Resources/" + str + "-checkpoint";
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -277378701:
                if (implMethodName.equals("lambda$timeSeriesSplit$52417cc8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/logicalclocks/hsfs/spark/engine/SparkEngine") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Object;")) {
                    return l -> {
                        return Long.toString(l.longValue()).length() > 10 ? Long.valueOf(Long.valueOf(l.longValue() / 1000).longValue()) : l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
