/*
 * Decompiled with CFR 0.152.
 */
package io.hops.util.featurestore;

import com.google.common.base.Strings;
import io.hops.util.Constants;
import io.hops.util.FeaturestoreRestClient;
import io.hops.util.Hops;
import io.hops.util.exceptions.CannotWriteImageDataFrameException;
import io.hops.util.exceptions.DataframeIsEmpty;
import io.hops.util.exceptions.FeaturegroupDoesNotExistError;
import io.hops.util.exceptions.FeaturestoreNotFound;
import io.hops.util.exceptions.HiveNotEnabled;
import io.hops.util.exceptions.InferTFRecordSchemaError;
import io.hops.util.exceptions.InvalidPrimaryKeyForFeaturegroup;
import io.hops.util.exceptions.OnlineFeaturestoreNotEnabled;
import io.hops.util.exceptions.OnlineFeaturestorePasswordNotFound;
import io.hops.util.exceptions.OnlineFeaturestoreUserNotFound;
import io.hops.util.exceptions.SparkDataTypeNotRecognizedError;
import io.hops.util.exceptions.StorageConnectorDoesNotExistError;
import io.hops.util.exceptions.TrainingDatasetDoesNotExistError;
import io.hops.util.exceptions.TrainingDatasetFormatNotSupportedError;
import io.hops.util.featurestore.dtos.app.FeaturestoreMetadataDTO;
import io.hops.util.featurestore.dtos.app.SQLJoinDTO;
import io.hops.util.featurestore.dtos.feature.FeatureDTO;
import io.hops.util.featurestore.dtos.featuregroup.FeaturegroupDTO;
import io.hops.util.featurestore.dtos.featuregroup.FeaturegroupType;
import io.hops.util.featurestore.dtos.featuregroup.OnDemandFeaturegroupDTO;
import io.hops.util.featurestore.dtos.settings.FeaturestoreClientSettingsDTO;
import io.hops.util.featurestore.dtos.stats.StatisticsDTO;
import io.hops.util.featurestore.dtos.stats.cluster_analysis.ClusterAnalysisDTO;
import io.hops.util.featurestore.dtos.stats.cluster_analysis.ClusterDTO;
import io.hops.util.featurestore.dtos.stats.cluster_analysis.DatapointDTO;
import io.hops.util.featurestore.dtos.stats.desc_stats.DescriptiveStatsDTO;
import io.hops.util.featurestore.dtos.stats.desc_stats.DescriptiveStatsMetricValueDTO;
import io.hops.util.featurestore.dtos.stats.desc_stats.DescriptiveStatsMetricValuesDTO;
import io.hops.util.featurestore.dtos.stats.feature_correlation.CorrelationValueDTO;
import io.hops.util.featurestore.dtos.stats.feature_correlation.FeatureCorrelationDTO;
import io.hops.util.featurestore.dtos.stats.feature_correlation.FeatureCorrelationMatrixDTO;
import io.hops.util.featurestore.dtos.stats.feature_distributions.FeatureDistributionDTO;
import io.hops.util.featurestore.dtos.stats.feature_distributions.FeatureDistributionsDTO;
import io.hops.util.featurestore.dtos.stats.feature_distributions.HistogramBinDTO;
import io.hops.util.featurestore.dtos.storageconnector.FeaturestoreJdbcConnectorDTO;
import io.hops.util.featurestore.dtos.storageconnector.FeaturestoreStorageConnectorDTO;
import io.hops.util.featurestore.dtos.trainingdataset.HopsfsTrainingDatasetDTO;
import io.hops.util.featurestore.dtos.trainingdataset.TrainingDatasetDTO;
import io.hops.util.featurestore.dtos.trainingdataset.TrainingDatasetType;
import io.hops.util.featurestore.ops.read_ops.FeaturestoreReadFeaturegroup;
import io.hops.util.featurestore.ops.read_ops.FeaturestoreReadMetadata;
import io.hops.util.featurestore.ops.write_ops.FeaturestoreUpdateMetadataCache;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.DenseMatrix;
import org.apache.spark.ml.stat.Correlation;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.NumericType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class FeaturestoreHelper {
    private static final Logger LOG = Logger.getLogger(FeaturestoreHelper.class.getName());
    private static JAXBContext featuregroupJAXBContext;
    private static JAXBContext descriptiveStatsJAXBContext;
    private static JAXBContext featureCorrelationJAXBContext;
    private static JAXBContext featureHistogramsJAXBContext;
    private static JAXBContext clusterAnalysisJAXBContext;
    private static JAXBContext featureJAXBContext;
    private static JAXBContext featurestoreMetadataJAXBContext;
    private static JAXBContext trainingDatasetJAXBContext;
    private static JAXBContext jdbcConnectorJAXBContext;
    private static Marshaller descriptiveStatsMarshaller;
    private static Marshaller featureCorrelationMarshaller;
    private static Marshaller featureHistogramsMarshaller;
    private static Marshaller clusteranalysisMarshaller;
    private static Marshaller featureMarshaller;
    private static Marshaller featurestoreMetadataMarshaller;
    private static Marshaller trainingDatasetMarshaller;
    private static Marshaller featuregroupMarshaller;
    private static Marshaller jdbcConnectorMarshaller;
    private static FeaturestoreMetadataDTO featurestoreMetadataCache;
    private static Pattern featurestoreRegex;

    private FeaturestoreHelper() {
    }

    public static String getProjectFeaturestore() {
        String projectName = Hops.getProjectName();
        return projectName.toLowerCase() + "_featurestore";
    }

    public static void useFeaturestore(SparkSession sparkSession, String featurestore) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        if (featurestore == null) {
            featurestore = FeaturestoreHelper.getProjectFeaturestore();
        }
        String sqlStr = "use " + featurestore;
        FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, false, featurestore);
    }

    public static String getTableName(String featuregroupName, int version) {
        return featuregroupName + "_" + version;
    }

    public static void insertIntoOnlineFeaturegroup(Dataset<Row> sparkDf, String featuregroup, String featurestore, int featuregroupVersion, String mode) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException {
        FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO = FeaturestoreHelper.doGetOnlineFeaturestoreJdbcConnector(featurestore, new FeaturestoreReadMetadata().setFeaturestore(featurestore).read());
        String tableName = FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        FeaturestoreHelper.writeJdbcDataframe(sparkDf, featurestoreJdbcConnectorDTO, tableName, mode);
    }

    public static void insertIntoOfflineFeaturegroup(Dataset<Row> sparkDf, SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String tableName = FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        SparkContext sc = sparkSession.sparkContext();
        SQLContext sqlContext = new SQLContext(sc);
        sqlContext.setConf("hive.exec.dynamic.partition", "true");
        sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");
        String mode = "append";
        String format = "hive";
        sparkDf.write().format(format).mode(mode).insertInto(tableName);
    }

    public static void writeHudiDataset(Dataset<Row> sparkDf, SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion, Map<String, String> hudiArgs, String hudiBasePath, String mode) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        DataFrameWriter writer = sparkDf.write().format("org.apache.hudi");
        for (Map.Entry<String, String> entry : hudiArgs.entrySet()) {
            writer = writer.option(entry.getKey(), entry.getValue());
        }
        writer = writer.mode(mode);
        if (!Strings.isNullOrEmpty((String)hudiBasePath)) {
            writer.save(hudiBasePath);
        } else {
            writer.save(FeaturestoreHelper.getDefaultHudiBasePath(featuregroup, featuregroupVersion));
        }
    }

    public static String getDefaultHudiBasePath(String tableName, int version) {
        return "hdfs:///Projects/" + Hops.getProjectName() + "/" + "Resources" + "/" + FeaturestoreHelper.getTableName(tableName, version);
    }

    public static Map<String, String> setupHudiHiveArgs(Map<String, String> hudiArgs, String tableName) throws StorageConnectorDoesNotExistError {
        hudiArgs.put("hoodie.datasource.hive_sync.enable", "true");
        hudiArgs.put("hoodie.datasource.hive_sync.table", tableName);
        FeaturestoreMetadataDTO featurestoreMetadata = FeaturestoreHelper.getFeaturestoreMetadataCache();
        FeaturestoreJdbcConnectorDTO jdbcConnector = (FeaturestoreJdbcConnectorDTO)FeaturestoreHelper.findStorageConnector(featurestoreMetadata.getStorageConnectors(), Hops.getProjectFeaturestore().read());
        hudiArgs.put("hoodie.datasource.hive_sync.jdbcurl", FeaturestoreHelper.getJDBCUrlFromConnector(jdbcConnector, new HashMap<String, String>()));
        hudiArgs.put("hoodie.datasource.hive_sync.database", Hops.getProjectFeaturestore().read());
        return hudiArgs;
    }

    public static String getJDBCUrlFromConnector(FeaturestoreJdbcConnectorDTO jdbcConnector, Map<String, String> jdbcArguments) {
        String jdbcConnectionString = jdbcConnector.getConnectionString();
        String[] jdbcConnectionStringArguments = jdbcConnector.getArguments().split(",");
        for (int i = 0; i < jdbcConnectionStringArguments.length; ++i) {
            String pw;
            if (jdbcArguments != null && jdbcArguments.containsKey(jdbcConnectionStringArguments[i])) {
                jdbcConnectionString = jdbcConnectionString + jdbcConnectionStringArguments[i] + "=" + jdbcArguments.get(jdbcConnectionStringArguments[i]) + ";";
                continue;
            }
            if (jdbcConnectionStringArguments[i].equalsIgnoreCase("sslTrustStore")) {
                String trustStore = Hops.getTrustStore();
                jdbcConnectionString = jdbcConnectionString + "sslTrustStore" + "=" + trustStore + ";";
            }
            if (jdbcConnectionStringArguments[i].equalsIgnoreCase("trustStorePassword")) {
                pw = Hops.getKeystorePwd();
                jdbcConnectionString = jdbcConnectionString + "trustStorePassword" + "=" + pw + ";";
            }
            if (jdbcConnectionStringArguments[i].equalsIgnoreCase("sslKeyStore")) {
                String keyStore = Hops.getKeyStore();
                jdbcConnectionString = jdbcConnectionString + "sslKeyStore" + "=" + keyStore + ";";
            }
            if (!jdbcConnectionStringArguments[i].equalsIgnoreCase("keyStorePassword")) continue;
            pw = Hops.getKeystorePwd();
            jdbcConnectionString = jdbcConnectionString + "keyStorePassword" + "=" + pw + ";";
        }
        return jdbcConnectionString;
    }

    private static List<FeaturegroupDTO> findFeaturegroupThatContainsFeature(List<FeaturegroupDTO> featuregroups, String feature) {
        ArrayList<FeaturegroupDTO> matches = new ArrayList<FeaturegroupDTO>();
        block0: for (FeaturegroupDTO featuregroupDTO : featuregroups) {
            for (FeatureDTO featureDTO : featuregroupDTO.getFeatures()) {
                String fullName = FeaturestoreHelper.getTableName(featuregroupDTO.getName(), featuregroupDTO.getVersion()) + "." + featureDTO.getName();
                if (!featureDTO.getName().equals(feature) && !fullName.equals(feature)) continue;
                matches.add(featuregroupDTO);
                continue block0;
            }
        }
        return matches;
    }

    public static List<FeaturegroupDTO> findFeaturegroupsThatContainsFeatures(List<FeaturegroupDTO> featuregroups, List<String> features, String featurestore) {
        HashSet<FeaturegroupDTO> featureFeaturegroupsSet = new HashSet<FeaturegroupDTO>();
        for (String feature : features) {
            FeaturegroupDTO featuregroupDTO = FeaturestoreHelper.findFeature(feature, featurestore, featuregroups);
            featureFeaturegroupsSet.add(featuregroupDTO);
        }
        ArrayList<FeaturegroupDTO> featureFeaturegroups = new ArrayList<FeaturegroupDTO>();
        featureFeaturegroups.addAll(featureFeaturegroupsSet);
        return featureFeaturegroups;
    }

    public static Dataset<Row> getCachedFeaturegroup(SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion, Boolean online) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled, FeaturegroupDoesNotExistError {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String sqlStr = "SELECT * FROM " + FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        Dataset<Row> sparkDf = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
        ArrayList<FeaturegroupDTO> featuregroups = new ArrayList<FeaturegroupDTO>();
        FeaturestoreMetadataDTO featurestoreMetadataDTO = FeaturestoreHelper.getFeaturestoreMetadataCache();
        FeaturegroupDTO featuregroupDTO = FeaturestoreHelper.findFeaturegroup(featurestoreMetadataDTO.getFeaturegroups(), featuregroup, featuregroupVersion);
        featuregroups.add(featuregroupDTO);
        List<String> features = featuregroupDTO.getFeatures().stream().map(f -> f.getName()).collect(Collectors.toList());
        Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(featuregroups, features, featurestore);
        sparkSession.sparkContext().setJobGroup("", "", true);
        return FeaturestoreHelper.addProvenanceMetadataToDataFrame(sparkDf, featureToFeaturegroupMapping);
    }

    public static Dataset<Row> getOnDemandFeaturegroup(SparkSession sparkSession, OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, String connectionString, String featurestore) {
        Dataset sparkDf = sparkSession.read().format("jdbc").option("url", connectionString).option("dbtable", "(" + onDemandFeaturegroupDTO.getQuery() + ") fs_q").load();
        List<String> schemaNames = Arrays.asList(sparkDf.schema().fieldNames()).stream().map(name -> name.replace("fs_q.", "")).collect(Collectors.toList());
        Dataset renamedSparkDf = sparkDf.toDF(FeaturestoreHelper.convertListToSeq(schemaNames));
        sparkSession.sparkContext().setJobGroup("", "", true);
        ArrayList<FeaturegroupDTO> featuregroups = new ArrayList<FeaturegroupDTO>();
        featuregroups.add(onDemandFeaturegroupDTO);
        List<String> features = onDemandFeaturegroupDTO.getFeatures().stream().map(f -> f.getName()).collect(Collectors.toList());
        Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(featuregroups, features, featurestore);
        return FeaturestoreHelper.addProvenanceMetadataToDataFrame((Dataset<Row>)renamedSparkDf, featureToFeaturegroupMapping);
    }

    public static Dataset<Row> getFeaturegroupPartitions(SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion, Boolean online) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String sqlStr = "SHOW PARTITIONS " + FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
    }

    public static Dataset<Row> getTrainingDataset(SparkSession sparkSession, String dataFormat, String path, TrainingDatasetType trainingDatasetType) throws TrainingDatasetFormatNotSupportedError, IOException, TrainingDatasetDoesNotExistError {
        Configuration hdfsConf = new Configuration();
        Path filePath = null;
        FileSystem hdfs = null;
        switch (dataFormat) {
            case "csv": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", ",").load(path);
                    }
                    filePath = new Path(path + ".csv");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", ",").load(path + ".csv");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".csv");
                }
                return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", ",").load(path);
            }
            case "tsv": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", "\t").load(path);
                    }
                    filePath = new Path(path + ".tsv");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", "\t").load(path + ".tsv");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".tsv");
                }
                return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", "\t").load(path);
            }
            case "parquet": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().parquet(path);
                    }
                    filePath = new Path(path + ".parquet");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().parquet(path + ".parquet");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".parquet");
                }
                return sparkSession.read().parquet(path);
            }
            case "avro": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).load(path);
                    }
                    filePath = new Path(path + ".avro");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).load(path + ".avro");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".avro");
                }
                return sparkSession.read().format(dataFormat).load(path);
            }
            case "orc": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).load(path);
                    }
                    filePath = new Path(path + ".orc");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).load(path + ".orc");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".orc");
                }
                return sparkSession.read().format(dataFormat).load(path);
            }
            case "image": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).load(path);
                    }
                    filePath = new Path(path + ".image");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).load(path + ".image");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".image");
                }
                return sparkSession.read().format(dataFormat).load(path);
            }
            case "tfrecords": {
                if (trainingDatasetType == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
                    filePath = new Path(path);
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).option("recordType", "Example").load(path);
                    }
                    filePath = new Path(path + ".tfrecords");
                    hdfs = filePath.getFileSystem(hdfsConf);
                    if (hdfs.exists(filePath)) {
                        return sparkSession.read().format(dataFormat).option("recordType", "Example").load(path + ".tfrecords");
                    }
                    throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + path + " or in file: " + path + ".tfrecords");
                }
                return sparkSession.read().format(dataFormat).option("recordType", "Example").load(path);
            }
        }
        throw new TrainingDatasetFormatNotSupportedError("The provided data format: " + dataFormat + " is not supported in the Java/Scala API, the supported data formats are: " + "csv" + "," + "tsv" + "," + "tfrecords" + "," + "parquet" + "," + "avro" + "," + "orc" + "," + "image" + ",");
    }

    private static FeaturegroupDTO findFeature(String feature, String featurestore, List<FeaturegroupDTO> featuregroupDTOS) {
        List<FeaturegroupDTO> matchedFeaturegroups = FeaturestoreHelper.findFeaturegroupThatContainsFeature(featuregroupDTOS, feature);
        if (matchedFeaturegroups.isEmpty()) {
            throw new IllegalArgumentException("Could not find the feature with name: " + feature + " in any of the featuregroups of the featurestore: " + featurestore);
        }
        if (matchedFeaturegroups.size() > 1) {
            List featuregroupStrings = featuregroupDTOS.stream().map(fg -> FeaturestoreHelper.getTableName(fg.getName(), fg.getVersion())).collect(Collectors.toList());
            String featuregroupsStr = StringUtils.join(featuregroupStrings, (String)", ");
            throw new IllegalArgumentException("Found the feature with name: " + feature + " in more than one of the featuregroups of the featurestore " + featurestore + " please specify featuregroup that you want to get the feature from. The matched featuregroups are: " + featuregroupsStr);
        }
        return matchedFeaturegroups.get(0);
    }

    public static Dataset<Row> getFeature(SparkSession sparkSession, String feature, String featurestore, List<FeaturegroupDTO> featuregroupDTOS, Map<String, String> jdbcArguments, Boolean online) throws FeaturegroupDoesNotExistError, HiveNotEnabled, StorageConnectorDoesNotExistError, OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        FeaturegroupDTO matchedFeaturegroup = FeaturestoreHelper.findFeature(feature, featurestore, featuregroupDTOS);
        if (matchedFeaturegroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
            ArrayList<FeaturegroupDTO> onDemandFeaturegroups = new ArrayList<FeaturegroupDTO>();
            onDemandFeaturegroups.add(matchedFeaturegroup);
            HashMap<String, Map<String, String>> onDemandFeaturegroupsJdbcArguments = new HashMap<String, Map<String, String>>();
            onDemandFeaturegroupsJdbcArguments.put(FeaturestoreHelper.getTableName(matchedFeaturegroup.getName(), matchedFeaturegroup.getVersion()), jdbcArguments);
            FeaturestoreHelper.registerOnDemandFeaturegroupsAsTempTables(onDemandFeaturegroups, featurestore, onDemandFeaturegroupsJdbcArguments);
        }
        String sqlStr = "SELECT " + feature + " FROM " + FeaturestoreHelper.getTableName(matchedFeaturegroup.getName(), matchedFeaturegroup.getVersion());
        ArrayList<FeaturegroupDTO> featuregroups = new ArrayList<FeaturegroupDTO>();
        featuregroups.add(matchedFeaturegroup);
        ArrayList<String> features = new ArrayList<String>();
        features.add(feature);
        Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(featuregroups, features, featurestore);
        Dataset<Row> result = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
        return FeaturestoreHelper.addProvenanceMetadataToDataFrame(result, featureToFeaturegroupMapping);
    }

    public static Dataset<Row> getFeature(SparkSession sparkSession, String feature, String featurestore, String featuregroup, int featuregroupVersion, List<FeaturegroupDTO> featuregroupDTOs, Map<String, String> jdbcArguments, Boolean online) throws FeaturegroupDoesNotExistError, HiveNotEnabled, StorageConnectorDoesNotExistError, OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturegroupDTO featuregroupDTO = FeaturestoreHelper.findFeaturegroup(featuregroupDTOs, featuregroup, featuregroupVersion);
        if (featuregroupDTO.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
            ArrayList<FeaturegroupDTO> onDemandFeaturegroups = new ArrayList<FeaturegroupDTO>();
            onDemandFeaturegroups.add(featuregroupDTO);
            HashMap<String, Map<String, String>> onDemandFeaturegroupsJdbcArguments = new HashMap<String, Map<String, String>>();
            onDemandFeaturegroupsJdbcArguments.put(FeaturestoreHelper.getTableName(featuregroupDTO.getName(), featuregroupDTO.getVersion()), jdbcArguments);
            FeaturestoreHelper.registerOnDemandFeaturegroupsAsTempTables(onDemandFeaturegroups, featurestore, onDemandFeaturegroupsJdbcArguments);
        }
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String sqlStr = "SELECT " + feature + " FROM " + FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        ArrayList<FeaturegroupDTO> featuregroups = new ArrayList<FeaturegroupDTO>();
        featuregroups.add(featuregroupDTO);
        ArrayList<String> features = new ArrayList<String>();
        features.add(feature);
        Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(featuregroups, features, featurestore);
        Dataset<Row> result = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
        return FeaturestoreHelper.addProvenanceMetadataToDataFrame(result, featureToFeaturegroupMapping);
    }

    private static SQLJoinDTO getJoinStr(List<FeaturegroupDTO> featuregroupDTOS, String joinKey) {
        int i;
        StringBuilder stringBuilder = new StringBuilder();
        for (i = 1; i < featuregroupDTOS.size(); ++i) {
            stringBuilder.append("JOIN " + FeaturestoreHelper.getTableName(featuregroupDTOS.get(i).getName(), featuregroupDTOS.get(i).getVersion()));
            stringBuilder.append(" ");
        }
        stringBuilder.append("ON ");
        for (i = 0; i < featuregroupDTOS.size(); ++i) {
            if (i != 0 && i < featuregroupDTOS.size() - 1) {
                stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(0).getName(), featuregroupDTOS.get(0).getVersion()));
                stringBuilder.append(".`");
                stringBuilder.append(joinKey);
                stringBuilder.append("`=");
                stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(i).getName(), featuregroupDTOS.get(i).getVersion()));
                stringBuilder.append(".`");
                stringBuilder.append(joinKey);
                stringBuilder.append("` AND ");
            }
            if (i == 0 || i != featuregroupDTOS.size() - 1) continue;
            stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(0).getName(), featuregroupDTOS.get(0).getVersion()));
            stringBuilder.append(".`");
            stringBuilder.append(joinKey);
            stringBuilder.append("`=");
            stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(i).getName(), featuregroupDTOS.get(i).getVersion()));
            stringBuilder.append(".`");
            stringBuilder.append(joinKey);
            stringBuilder.append("`");
        }
        return new SQLJoinDTO(stringBuilder.toString(), featuregroupDTOS);
    }

    private static String getColumnThatIsPrimary(String[] commonCols, List<FeaturegroupDTO> featuregroupDTOS) {
        int[] primaryCounts = new int[commonCols.length];
        for (int i = 0; i < commonCols.length; ++i) {
            int primaryCount = 0;
            for (FeaturegroupDTO featuregroupDTO : featuregroupDTOS) {
                for (FeatureDTO featureDTO : featuregroupDTO.getFeatures()) {
                    if (!featureDTO.getName().equalsIgnoreCase(commonCols[i]) || !featureDTO.getPrimary().booleanValue()) continue;
                    ++primaryCount;
                }
            }
            primaryCounts[i] = primaryCount;
        }
        int maxPrimaryCount = 0;
        int maxPrimaryCountIndex = 0;
        for (int i = 0; i < primaryCounts.length; ++i) {
            if (primaryCounts[i] <= maxPrimaryCount) continue;
            maxPrimaryCount = primaryCounts[i];
            maxPrimaryCountIndex = i;
        }
        return commonCols[maxPrimaryCountIndex];
    }

    public static String getJoinColumn(List<FeaturegroupDTO> featuregroupDTOS) {
        ArrayList featureSets = new ArrayList();
        HashSet completeFeatureSet = new HashSet();
        for (FeaturegroupDTO featuregroupDTO : featuregroupDTOS) {
            List columnNames = featuregroupDTO.getFeatures().stream().map(feature -> feature.getName()).collect(Collectors.toList());
            HashSet featureSet = new HashSet(columnNames);
            completeFeatureSet.addAll(columnNames);
            featureSets.add(featureSet);
        }
        for (Set set : featureSets) {
            completeFeatureSet.retainAll(set);
        }
        if (completeFeatureSet.isEmpty()) {
            List featuregroupStrings = featuregroupDTOS.stream().map(featuregroup -> featuregroup.getName()).collect(Collectors.toList());
            String string = StringUtils.join(featuregroupStrings, (String)", ");
            throw new IllegalArgumentException("Could not find any common columns in featuregroups to join on, searched through the following featuregroups: " + string);
        }
        return FeaturestoreHelper.getColumnThatIsPrimary(completeFeatureSet.toArray(new String[completeFeatureSet.size()]), featuregroupDTOS);
    }

    private static List<FeaturegroupDTO> convertFeaturegroupAndVersionToDTOs(Map<String, Integer> featuregroupsAndVersions) {
        ArrayList<FeaturegroupDTO> featuregroupDTOs = new ArrayList<FeaturegroupDTO>();
        for (Map.Entry<String, Integer> entry : featuregroupsAndVersions.entrySet()) {
            FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO();
            featuregroupDTO.setName(entry.getKey());
            featuregroupDTO.setVersion(entry.getValue());
            featuregroupDTOs.add(featuregroupDTO);
        }
        return featuregroupDTOs;
    }

    public static Dataset<Row> getFeatures(SparkSession sparkSession, List<String> features, String featurestore, Map<String, Integer> featuregroupsAndVersions, String joinKey, List<FeaturegroupDTO> featuregroupsMetadata, Map<String, Map<String, String>> jdbcArguments, Boolean online) throws FeaturegroupDoesNotExistError, StorageConnectorDoesNotExistError, HiveNotEnabled, OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String featuresStr = StringUtils.join(features, (String)", ");
        ArrayList<String> featuregroupStrings = new ArrayList<String>();
        ArrayList<FeaturegroupDTO> featuregroupDTOS = new ArrayList<FeaturegroupDTO>();
        for (Map.Entry<String, Integer> entry : featuregroupsAndVersions.entrySet()) {
            featuregroupStrings.add(FeaturestoreHelper.getTableName(entry.getKey(), entry.getValue()));
            featuregroupDTOS.add(FeaturestoreHelper.findFeaturegroup(featuregroupsMetadata, entry.getKey(), entry.getValue()));
        }
        String featuregroupStr = StringUtils.join(featuregroupStrings, (String)", ");
        List<FeaturegroupDTO> onDemandFeaturegroups = featuregroupDTOS.stream().filter(fg -> fg.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP).collect(Collectors.toList());
        FeaturestoreHelper.registerOnDemandFeaturegroupsAsTempTables(onDemandFeaturegroups, featurestore, jdbcArguments);
        if (featuregroupsAndVersions.size() == 1) {
            Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(featuregroupDTOS, features, featurestore);
            String sqlStr = "SELECT " + featuresStr + " FROM " + featuregroupStr;
            Dataset<Row> result = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
            return FeaturestoreHelper.addProvenanceMetadataToDataFrame(result, featureToFeaturegroupMapping);
        }
        SQLJoinDTO sqlJoinDTO = FeaturestoreHelper.getJoinStr(featuregroupDTOS, joinKey);
        String sqlStr = "SELECT " + featuresStr + " FROM " + FeaturestoreHelper.getTableName(sqlJoinDTO.getFeaturegroupDTOS().get(0).getName(), sqlJoinDTO.getFeaturegroupDTOS().get(0).getVersion()) + " " + sqlJoinDTO.getJoinStr();
        Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(sqlJoinDTO.getFeaturegroupDTOS(), features, featurestore);
        Dataset<Row> result = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
        return FeaturestoreHelper.addProvenanceMetadataToDataFrame(result, featureToFeaturegroupMapping);
    }

    private static Dataset<Row> addProvenanceMetadataToDataFrame(Dataset<Row> sparkDf, Map<String, String> featureToFeaturegroupMapping) {
        for (Map.Entry<String, String> entry : featureToFeaturegroupMapping.entrySet()) {
            Metadata metadata = FeaturestoreHelper.getColumnMetadata(sparkDf.schema(), entry.getKey());
            int lastIndexOf = entry.getValue().lastIndexOf("_");
            String featuregroupName = entry.getValue().substring(0, lastIndexOf);
            String featuregroupVersion = entry.getValue().substring(lastIndexOf + 1);
            MetadataBuilder metadataBuilder = new MetadataBuilder().putString("featuregroup", featuregroupName).putString("version", featuregroupVersion);
            if (metadata.contains("comment")) {
                metadataBuilder = metadataBuilder.putString("comment", metadata.getString("comment"));
            }
            sparkDf = sparkDf.withColumn(entry.getKey(), functions.col((String)entry.getKey()).as("", metadataBuilder.build()));
        }
        return sparkDf;
    }

    private static Metadata getColumnMetadata(StructType sparkSchema, String columnName) {
        for (int i = 0; i < sparkSchema.fields().length; ++i) {
            StructField field = sparkSchema.fields()[i];
            if (!field.name().equalsIgnoreCase(columnName)) continue;
            return field.metadata();
        }
        throw new IllegalArgumentException("Feature not found: " + columnName);
    }

    private static Map<String, String> getFeatureToFeaturegroupMapping(List<FeaturegroupDTO> featuregroups, List<String> features, String featurestore) {
        HashMap<String, String> mapping = new HashMap<String, String>();
        for (String feature : features) {
            FeaturegroupDTO featuregroupDTO = FeaturestoreHelper.findFeature(feature, featurestore, featuregroups);
            mapping.put(FeaturestoreHelper.getFeatureShortName(feature), FeaturestoreHelper.getTableName(featuregroupDTO.getName(), featuregroupDTO.getVersion()));
        }
        return mapping;
    }

    private static String getFeatureShortName(String featureName) {
        if (featureName.contains(".")) {
            return featureName.split("\\.")[1];
        }
        return featureName;
    }

    public static Dataset<Row> getFeatures(SparkSession sparkSession, List<String> features, String featurestore, List<FeaturegroupDTO> featuregroupsMetadata, String joinKey, Map<String, Map<String, String>> jdbcArguments, Boolean online) throws FeaturegroupDoesNotExistError, HiveNotEnabled, StorageConnectorDoesNotExistError, OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String featuresStr = StringUtils.join(features, (String)", ");
        List featuregroupStrings = featuregroupsMetadata.stream().map(fg -> FeaturestoreHelper.getTableName(fg.getName(), fg.getVersion())).collect(Collectors.toList());
        List<FeaturegroupDTO> onDemandFeaturegroups = featuregroupsMetadata.stream().filter(fg -> fg.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP).collect(Collectors.toList());
        FeaturestoreHelper.registerOnDemandFeaturegroupsAsTempTables(onDemandFeaturegroups, featurestore, jdbcArguments);
        String featuregroupStr = StringUtils.join(featuregroupStrings, (String)", ");
        if (featuregroupsMetadata.size() == 1) {
            String sqlStr = "SELECT " + featuresStr + " FROM " + featuregroupStr;
            Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(featuregroupsMetadata, features, featurestore);
            Dataset<Row> result = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
            return FeaturestoreHelper.addProvenanceMetadataToDataFrame(result, featureToFeaturegroupMapping);
        }
        SQLJoinDTO sqlJoinDTO = FeaturestoreHelper.getJoinStr(featuregroupsMetadata, joinKey);
        String sqlStr = "SELECT " + featuresStr + " FROM " + FeaturestoreHelper.getTableName(sqlJoinDTO.getFeaturegroupDTOS().get(0).getName(), sqlJoinDTO.getFeaturegroupDTOS().get(0).getVersion()) + " " + sqlJoinDTO.getJoinStr();
        Map<String, String> featureToFeaturegroupMapping = FeaturestoreHelper.getFeatureToFeaturegroupMapping(sqlJoinDTO.getFeaturegroupDTOS(), features, featurestore);
        Dataset<Row> result = FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr, online, featurestore);
        return FeaturestoreHelper.addProvenanceMetadataToDataFrame(result, featureToFeaturegroupMapping);
    }

    public static Dataset<Row> queryFeaturestore(SparkSession sparkSession, String query, String featurestore, Boolean online) throws OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        return FeaturestoreHelper.logAndRunSQL(sparkSession, query, online, featurestore);
    }

    private static Dataset<Row> logAndRunSQL(SparkSession sparkSession, String sqlStr, Boolean online, String featurestore) throws JAXBException, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, OnlineFeaturestorePasswordNotFound, OnlineFeaturestoreNotEnabled {
        if (online == null || !online.booleanValue()) {
            LOG.log(Level.INFO, "Running sql: " + sqlStr + " against offline feature store");
            return sparkSession.sql(sqlStr);
        }
        LOG.log(Level.INFO, "Running sql: " + sqlStr + " against online feature store");
        FeaturestoreMetadataDTO featurestoreMetadataDTO = new FeaturestoreReadMetadata().setFeaturestore(featurestore).read();
        if (!(!online.booleanValue() || featurestoreMetadataDTO.getSettings().getOnlineFeaturestoreEnabled().booleanValue() && featurestoreMetadataDTO.getFeaturestore().getOnlineEnabled().booleanValue())) {
            throw new OnlineFeaturestoreNotEnabled("Online feature store is not enabled for this cluster or project. Talk with an administrator to enable it.");
        }
        FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO = FeaturestoreHelper.doGetOnlineFeaturestoreJdbcConnector(featurestore, new FeaturestoreReadMetadata().setFeaturestore(featurestore).read());
        return FeaturestoreHelper.readJdbcDataFrame(sparkSession, featurestoreJdbcConnectorDTO, "(" + sqlStr + ") tmp");
    }

    public static List<FeaturegroupDTO> filterFeaturegroupsBasedOnMap(Map<String, Integer> featuregroupsAndVersions, List<FeaturegroupDTO> featuregroupsMetadata) {
        return featuregroupsMetadata.stream().filter(fgm -> featuregroupsAndVersions.get(fgm.getName()) != null && fgm.getVersion().equals(featuregroupsAndVersions.get(fgm.getName()))).collect(Collectors.toList());
    }

    public static FeaturestoreJdbcConnectorDTO doGetOnlineFeaturestoreJdbcConnector(String featurestore, FeaturestoreMetadataDTO featurestoreMetadataDTO) throws JAXBException, FeaturestoreNotFound {
        if (Strings.isNullOrEmpty((String)(featurestore = FeaturestoreHelper.featurestoreGetOrDefault(featurestore)))) {
            throw new IllegalArgumentException("Featurestore Parameter Cannot be Null.");
        }
        if (featurestoreMetadataDTO != null && featurestoreMetadataDTO.getOnlineFeaturestoreConnector() != null) {
            return featurestoreMetadataDTO.getOnlineFeaturestoreConnector();
        }
        return FeaturestoreRestClient.getOnlineFeaturestoreJdbcConnectorRest(featurestore);
    }

    private static FeatureDTO convertFieldToFeature(StructField field, List<String> primaryKey, List<String> partitionBy, Boolean online, Map<String, String> onlineTypes) {
        String featureName = field.name();
        String featureType = field.dataType().catalogString();
        String featureDesc = "";
        String onlineType = "";
        Boolean primary = false;
        if (primaryKey != null && primaryKey.contains(featureName)) {
            primary = true;
        }
        if (field.metadata() != null && field.metadata().contains("description")) {
            featureDesc = field.metadata().getString("description");
        }
        if (field.metadata() != null && !field.metadata().contains("description") && field.getComment().isDefined()) {
            featureDesc = (String)field.getComment().get();
        }
        if (featureDesc.isEmpty()) {
            featureDesc = "-";
        }
        Boolean partition = false;
        if (partitionBy != null && partitionBy.contains(featureName)) {
            partition = true;
        }
        if (online.booleanValue()) {
            onlineType = onlineTypes != null && onlineTypes.containsKey(featureName) ? onlineTypes.get(featureName) : FeaturestoreHelper.convertHiveTypeToMySQL(field.dataType().catalogString());
        }
        return new FeatureDTO(featureName, featureType, featureDesc, primary, partition, onlineType);
    }

    private static String convertHiveTypeToMySQL(String hiveDataType) {
        if (hiveDataType.equalsIgnoreCase("SMALLINT")) {
            return "SMALLINT(5)";
        }
        if (hiveDataType.equalsIgnoreCase("INT")) {
            return "INT(11)";
        }
        if (hiveDataType.equalsIgnoreCase("BIGINT")) {
            return "BIGINT(20)";
        }
        if (hiveDataType.equalsIgnoreCase("INTERVAL")) {
            return "DATE";
        }
        if (hiveDataType.equalsIgnoreCase("STRING")) {
            return "VARCHAR(1000)";
        }
        if (hiveDataType.equalsIgnoreCase("VARCHAR")) {
            return "VARCHAR(1000)";
        }
        if (hiveDataType.equalsIgnoreCase("BOOLEAN")) {
            return "TINYINT(1)";
        }
        if (hiveDataType.equalsIgnoreCase("BINARY")) {
            return "BLOB";
        }
        if (Constants.MYSQL_DATA_TYPES.contains(hiveDataType.toUpperCase())) {
            return hiveDataType;
        }
        throw new IllegalArgumentException("Conversion of data type: " + hiveDataType + " to a valid MySQL datatype failed. Please explicitly provide the type through the argument 'onlineTypes'");
    }

    public static List<FeatureDTO> parseSparkFeaturesSchema(StructType sparkSchema, List<String> primaryKey, List<String> partitionBy, Boolean online, Map<String, String> onlineTypes) {
        StructField[] fieldsList = sparkSchema.fields();
        ArrayList<FeatureDTO> features = new ArrayList<FeatureDTO>();
        for (int i = 0; i < fieldsList.length; ++i) {
            features.add(FeaturestoreHelper.convertFieldToFeature(fieldsList[i], primaryKey, partitionBy, online, onlineTypes));
        }
        return features;
    }

    public static List<String> getDefaultPrimaryKey(Dataset<Row> featuregroupDf) {
        ArrayList<String> primaryKey = new ArrayList<String>();
        primaryKey.add((String)featuregroupDf.dtypes()[0]._1);
        return primaryKey;
    }

    public static Boolean validatePrimaryKey(Dataset<Row> featuregroupDf, List<String> primaryKey) throws InvalidPrimaryKeyForFeaturegroup {
        ArrayList<String> columns = new ArrayList<String>();
        for (int i = 0; i < featuregroupDf.dtypes().length; ++i) {
            columns.add(((String)featuregroupDf.dtypes()[i]._1).toLowerCase());
        }
        for (String pk : primaryKey) {
            if (columns.contains(pk.toLowerCase())) continue;
            throw new InvalidPrimaryKeyForFeaturegroup("Invalid primary Key: " + pk + ", the specified primary key does not exist among the available columns: " + StringUtils.join((Object[])new Object[]{",", columns}));
        }
        return true;
    }

    public static void validateMetadata(String name, Tuple2<String, String>[] dtypes, String description, FeaturestoreClientSettingsDTO featurestoreClientSettingsDTO) {
        if (name.length() > 256 || !featurestoreRegex.matcher(name).matches()) {
            throw new IllegalArgumentException("Name of feature group/training dataset cannot be empty, cannot contain upper case characters, cannot exceed 256 characters, cannot contain hyphens ('-') and must match the regular expression: " + featurestoreClientSettingsDTO.getFeaturestoreRegex() + ", the provided name: " + name + " is not valid");
        }
        if (dtypes.length == 0) {
            throw new IllegalArgumentException("Cannot create a feature group from an empty spark dataframe");
        }
        for (int i = 0; i < dtypes.length; ++i) {
            if (((String)dtypes[i]._1).length() <= 767 && featurestoreRegex.matcher((CharSequence)dtypes[i]._1).matches()) continue;
            throw new IllegalArgumentException("Name of feature column cannot be empty, cannot exceed 767 characters, cannot contains hyphens ('-'), and must match the regular expression: " + featurestoreClientSettingsDTO.getFeaturestoreRegex() + ", the provided feature name: " + (String)dtypes[i]._1 + " is not valid");
        }
        if (description.length() > 2000) {
            throw new IllegalArgumentException("Feature group/Training dataset description should not exceed the maximum length of 2000 characters, the provided description has length:" + description.length());
        }
    }

    public static JSONArray convertFeatureDTOsToJsonObjects(List<FeatureDTO> featureDTOS) throws JAXBException {
        JSONArray features = new JSONArray();
        for (FeatureDTO featureDTO : featureDTOS) {
            features.put((Object)FeaturestoreHelper.dtoToJson(featureMarshaller, featureDTO));
        }
        return features;
    }

    public static JSONObject dtoToJson(Marshaller marshaller, Object object) throws JAXBException {
        if (object == null) {
            return null;
        }
        StringWriter sw = new StringWriter();
        marshaller.marshal(object, (Writer)sw);
        return new JSONObject(sw.toString());
    }

    public static Unmarshaller getUnmarshaller(JAXBContext jaxbContext) throws JAXBException {
        Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
        unmarshaller.setProperty("eclipselink.json.include-root", (Object)false);
        unmarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
        return unmarshaller;
    }

    public static JSONObject convertFeaturegroupDTOToJsonObject(FeaturegroupDTO featuregroupDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(featuregroupMarshaller, featuregroupDTO);
    }

    public static JSONObject convertTrainingDatasetDTOToJsonObject(TrainingDatasetDTO trainingDatasetDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(trainingDatasetMarshaller, trainingDatasetDTO);
    }

    public static JSONObject convertFeatureCorrelationMatrixDTOToJsonObject(FeatureCorrelationMatrixDTO featureCorrelationMatrixDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(featureCorrelationMarshaller, featureCorrelationMatrixDTO);
    }

    public static JSONObject convertDescriptiveStatsDTOToJsonObject(DescriptiveStatsDTO descriptiveStatsDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(descriptiveStatsMarshaller, descriptiveStatsDTO);
    }

    public static JSONObject convertFeatureDistributionsDTOToJsonObject(FeatureDistributionsDTO featureDistributionsDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(featureHistogramsMarshaller, featureDistributionsDTO);
    }

    public static JSONObject convertClusterAnalysisDTOToJsonObject(ClusterAnalysisDTO clusterAnalysisDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(clusteranalysisMarshaller, clusterAnalysisDTO);
    }

    public static FeaturestoreJdbcConnectorDTO parseJdbcConnectorJson(JSONObject jdbcConnectorJson) throws JAXBException {
        Unmarshaller unmarshaller = FeaturestoreHelper.getUnmarshaller(jdbcConnectorJAXBContext);
        StreamSource json = new StreamSource(new StringReader(jdbcConnectorJson.toString()));
        return (FeaturestoreJdbcConnectorDTO)unmarshaller.unmarshal((Source)json, FeaturestoreJdbcConnectorDTO.class).getValue();
    }

    public static FeaturestoreMetadataDTO parseFeaturestoreMetadataJson(JSONObject featurestoreMetadata) throws JAXBException {
        Unmarshaller unmarshaller = FeaturestoreHelper.getUnmarshaller(featurestoreMetadataJAXBContext);
        StreamSource json = new StreamSource(new StringReader(featurestoreMetadata.toString()));
        return (FeaturestoreMetadataDTO)unmarshaller.unmarshal((Source)json, FeaturestoreMetadataDTO.class).getValue();
    }

    public static TrainingDatasetDTO parseTrainingDatasetJson(JSONObject trainingDatasetJson) throws JAXBException {
        Unmarshaller unmarshaller = FeaturestoreHelper.getUnmarshaller(trainingDatasetJAXBContext);
        StreamSource json = new StreamSource(new StringReader(trainingDatasetJson.toString()));
        return (TrainingDatasetDTO)unmarshaller.unmarshal((Source)json, TrainingDatasetDTO.class).getValue();
    }

    private static Dataset<Row> filterSparkDfNumeric(Dataset<Row> sparkDf) {
        StructField[] fields = sparkDf.schema().fields();
        ArrayList<Column> numericColumnNames = new ArrayList<Column>();
        for (int i = 0; i < fields.length; ++i) {
            if (!(fields[i].dataType() instanceof NumericType)) continue;
            numericColumnNames.add(functions.col((String)fields[i].name()));
        }
        Column[] numericColumnNamesArr = new Column[numericColumnNames.size()];
        numericColumnNames.toArray(numericColumnNamesArr);
        return sparkDf.select(numericColumnNamesArr);
    }

    private static DescriptiveStatsDTO computeDescriptiveStatistics(Dataset<Row> sparkDf) {
        String[] rawStatsArray = (String[])sparkDf.describe(new String[0]).toJSON().collect();
        ArrayList<Object> rawStatsObjects = new ArrayList<Object>();
        HashSet columnNames = new HashSet();
        for (int i = 0; i < rawStatsArray.length; ++i) {
            JSONObject rawStatObj = new JSONObject(rawStatsArray[i]);
            rawStatsObjects.add(rawStatObj);
            columnNames.addAll(rawStatObj.keySet());
        }
        ArrayList<DescriptiveStatsMetricValuesDTO> descriptiveStatsMetricValuesDTOList = new ArrayList<DescriptiveStatsMetricValuesDTO>();
        for (String colName : columnNames) {
            if (colName.equals("summary")) continue;
            DescriptiveStatsMetricValuesDTO descriptiveStatsMetricValuesDTO = new DescriptiveStatsMetricValuesDTO();
            ArrayList<DescriptiveStatsMetricValueDTO> descriptiveStatsMetricValueDTOList = new ArrayList<DescriptiveStatsMetricValueDTO>();
            for (int i = 0; i < rawStatsObjects.size(); ++i) {
                Float value;
                JSONObject rawStatObj = (JSONObject)rawStatsObjects.get(i);
                if (!rawStatObj.has(colName)) continue;
                try {
                    value = Float.valueOf(Float.parseFloat(rawStatObj.getString(colName)));
                    if (value.isNaN() || value.isInfinite()) {
                        value = null;
                    }
                }
                catch (NullPointerException | NumberFormatException e) {
                    value = null;
                }
                DescriptiveStatsMetricValueDTO descriptiveStatsMetricValueDTO = new DescriptiveStatsMetricValueDTO();
                descriptiveStatsMetricValueDTO.setValue(value);
                descriptiveStatsMetricValueDTO.setMetricName(rawStatObj.getString("summary"));
                descriptiveStatsMetricValueDTOList.add(descriptiveStatsMetricValueDTO);
            }
            descriptiveStatsMetricValuesDTO.setFeatureName(colName);
            descriptiveStatsMetricValuesDTO.setMetricValues(descriptiveStatsMetricValueDTOList);
            descriptiveStatsMetricValuesDTOList.add(descriptiveStatsMetricValuesDTO);
        }
        DescriptiveStatsDTO descriptiveStatsDTO = new DescriptiveStatsDTO();
        descriptiveStatsDTO.setDescriptiveStats(descriptiveStatsMetricValuesDTOList);
        return descriptiveStatsDTO;
    }

    private static ClusterAnalysisDTO computeClusterAnalysis(Dataset<Row> sparkDf, int clusters) {
        Dataset<Row> sparkDf1 = FeaturestoreHelper.assembleColumnsIntoVector(sparkDf, "featurestore_feature_clustering_analysis_input_col");
        KMeans kmeans = new KMeans();
        kmeans.setK(clusters);
        kmeans.setSeed(1L);
        kmeans.setMaxIter(20);
        kmeans.setFeaturesCol("featurestore_feature_clustering_analysis_input_col");
        kmeans.setPredictionCol("featurestore_feature_clustering_analysis_output_col");
        KMeansModel kMeansModel = kmeans.fit(sparkDf1.select("featurestore_feature_clustering_analysis_input_col", new String[0]));
        Dataset sparkDf2 = kMeansModel.transform(sparkDf1);
        Column[] cols = new Column[]{functions.col((String)"featurestore_feature_clustering_analysis_input_col"), functions.col((String)"featurestore_feature_clustering_analysis_output_col")};
        Dataset sparkDf3 = sparkDf2.select(cols);
        Long count = sparkDf3.count();
        Dataset sparkDf4 = count < 50L ? sparkDf3 : sparkDf3.sample(true, (double)(50.0f / (float)count.longValue()));
        PCA pca = new PCA();
        pca.setK(2);
        pca.setInputCol("featurestore_feature_clustering_analysis_input_col");
        pca.setOutputCol("featurestore_feature_clustering_analysis_pca_col");
        PCAModel pcaModel = pca.fit(sparkDf4);
        cols[0] = functions.col((String)"featurestore_feature_clustering_analysis_pca_col");
        cols[1] = functions.col((String)"featurestore_feature_clustering_analysis_output_col");
        Dataset sparkDf5 = pcaModel.transform(sparkDf4).select(cols);
        Dataset sparkDf6 = sparkDf5.withColumnRenamed("featurestore_feature_clustering_analysis_pca_col", "features");
        Dataset sparkDf7 = sparkDf6.withColumnRenamed("featurestore_feature_clustering_analysis_output_col", "clusters");
        String[] jsonStrResult = (String[])sparkDf7.toJSON().collect();
        ClusterAnalysisDTO clusterAnalysisDTO = new ClusterAnalysisDTO();
        ArrayList<ClusterDTO> clusterDTOs = new ArrayList<ClusterDTO>();
        ArrayList<DatapointDTO> datapointDTOs = new ArrayList<DatapointDTO>();
        for (int i = 0; i < jsonStrResult.length; ++i) {
            JSONObject jsonObj = new JSONObject(jsonStrResult[i]);
            JSONObject featuresObj = jsonObj.getJSONObject("features");
            int cluster = jsonObj.getInt("clusters");
            JSONArray dimensions = featuresObj.getJSONArray("values");
            DatapointDTO datapointDTO = new DatapointDTO();
            datapointDTO.setDatapointName(Integer.toString(i));
            try {
                Float firstDimension = Float.valueOf((float)dimensions.getDouble(0));
                Float secondDimension = Float.valueOf((float)dimensions.getDouble(1));
                if (firstDimension.isInfinite() || firstDimension.isNaN()) {
                    firstDimension = Float.valueOf(0.0f);
                }
                if (secondDimension.isNaN() || secondDimension.isNaN()) {
                    secondDimension = Float.valueOf(0.0f);
                }
                datapointDTO.setFirstDimension(firstDimension);
                datapointDTO.setSecondDimension(secondDimension);
            }
            catch (ClassCastException e) {
                datapointDTO.setFirstDimension(Float.valueOf(0.0f));
                datapointDTO.setSecondDimension(Float.valueOf(0.0f));
            }
            ClusterDTO clusterDTO = new ClusterDTO();
            clusterDTO.setCluster(cluster);
            clusterDTO.setDatapointName(Integer.toString(i));
            datapointDTOs.add(datapointDTO);
            clusterDTOs.add(clusterDTO);
        }
        clusterAnalysisDTO.setClusters(clusterDTOs);
        clusterAnalysisDTO.setDataPoints(datapointDTOs);
        return clusterAnalysisDTO;
    }

    private static FeatureCorrelationMatrixDTO computeCorrMatrix(Dataset<Row> sparkDf, String correlationMethod) {
        int numberOfColumns = sparkDf.dtypes().length;
        if (numberOfColumns == 0) {
            throw new IllegalArgumentException("The provided spark dataframe does not contain any numeric columns.\nCannot compute feature correlation on categorical columns. \n The numeric datatypes are:" + StringUtils.join(Constants.NUMERIC_SPARK_TYPES, (String)",") + " \n The number of numeric datatypes in the provided dataframe is: " + numberOfColumns + "(" + Arrays.toString(sparkDf.dtypes()) + ")");
        }
        if (numberOfColumns == 1) {
            throw new IllegalArgumentException("The provided spark dataframe only contains one numeric column.\nCannot compute feature correlation on just one column. \n The numeric datatypes are:" + StringUtils.join(Constants.NUMERIC_SPARK_TYPES, (String)",") + " \n The number of numeric datatypes in the provided dataframe is: " + numberOfColumns + "(" + Arrays.toString(sparkDf.dtypes()) + ")");
        }
        if (numberOfColumns > 50) {
            throw new IllegalArgumentException("The provided spark dataframe have " + numberOfColumns + " columns, which exceeds the maximum number of columns: " + 50 + ". This is due to scalability reasons (number of correlatons grows quadratically with the number of columns");
        }
        Dataset<Row> sparkDf1 = FeaturestoreHelper.assembleColumnsIntoVector(sparkDf, "featurestore_feature_correlation_analysis_input_col");
        Row firstRow = (Row)Correlation.corr(sparkDf1, (String)"featurestore_feature_correlation_analysis_input_col", (String)correlationMethod).head();
        DenseMatrix correlationMatrix = (DenseMatrix)firstRow.get(0);
        FeatureCorrelationMatrixDTO featureCorrelationMatrixDTO = new FeatureCorrelationMatrixDTO();
        ArrayList<FeatureCorrelationDTO> featureCorrelationDTOList = new ArrayList<FeatureCorrelationDTO>();
        int noCorrelationMatrixColumns = correlationMatrix.numCols();
        int noCorrelationMatrixRows = correlationMatrix.numRows();
        StructField[] fields = sparkDf1.schema().fields();
        for (int i = 0; i < noCorrelationMatrixColumns; ++i) {
            String featureName = fields[i].name();
            FeatureCorrelationDTO featureCorrelationDTO = new FeatureCorrelationDTO();
            ArrayList<CorrelationValueDTO> correlationValueDTOList = new ArrayList<CorrelationValueDTO>();
            featureCorrelationDTO.setFeatureName(featureName);
            for (int j = 0; j < noCorrelationMatrixRows; ++j) {
                CorrelationValueDTO correlationValueDTO = new CorrelationValueDTO();
                String featureName2 = fields[j].name();
                correlationValueDTO.setFeatureName(featureName2);
                try {
                    Float corr = Float.valueOf((float)correlationMatrix.apply(i, j));
                    if (corr.isNaN() || corr.isInfinite()) {
                        corr = Float.valueOf(0.0f);
                    }
                    correlationValueDTO.setCorrelation(corr);
                }
                catch (ClassCastException e) {
                    correlationValueDTO.setCorrelation(Float.valueOf(0.0f));
                }
                correlationValueDTOList.add(correlationValueDTO);
            }
            featureCorrelationDTO.setCorrelationValues(correlationValueDTOList);
            featureCorrelationDTOList.add(featureCorrelationDTO);
        }
        featureCorrelationMatrixDTO.setFeatureCorrelations(featureCorrelationDTOList);
        return featureCorrelationMatrixDTO;
    }

    private static Dataset<Row> assembleColumnsIntoVector(Dataset<Row> sparkDf, String colName) {
        ArrayList<Object> numericColumns = new ArrayList<Object>();
        for (int i = 0; i < sparkDf.dtypes().length; ++i) {
            numericColumns.add(sparkDf.dtypes()[i]._1);
        }
        String[] numericColumnNamesArr = new String[numericColumns.size()];
        numericColumns.toArray(numericColumnNamesArr);
        VectorAssembler vectorAssembler = new VectorAssembler();
        vectorAssembler.setInputCols(numericColumnNamesArr);
        vectorAssembler.setOutputCol(colName);
        return vectorAssembler.transform(sparkDf);
    }

    private static FeatureDistributionsDTO computeFeatureHistograms(Dataset<Row> sparkDf, int numBins) throws SparkDataTypeNotRecognizedError {
        ArrayList<FeatureDistributionDTO> featureDistributionDTOS = new ArrayList<FeatureDistributionDTO>();
        for (int i = 0; i < sparkDf.schema().fields().length; ++i) {
            Tuple2 colHist = null;
            if (sparkDf.schema().fields()[i].dataType() instanceof IntegerType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getInt(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DecimalType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getDecimal(0).doubleValue()).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DoubleType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getDouble(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof FloatType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getFloat(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof LongType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getLong(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof ShortType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getShort(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (colHist == null) {
                throw new SparkDataTypeNotRecognizedError("Could not parse the spark datatypes to compute feature histograms");
            }
            double[] bins = (double[])colHist._1;
            long[] frequencies = (long[])colHist._2;
            FeatureDistributionDTO featureDistributionDTO = new FeatureDistributionDTO();
            featureDistributionDTO.setFeatureName((String)sparkDf.dtypes()[i]._1);
            ArrayList<HistogramBinDTO> histogramBinDTOList = new ArrayList<HistogramBinDTO>();
            for (int j = 0; j < frequencies.length; ++j) {
                HistogramBinDTO histogramBinDTO = new HistogramBinDTO();
                histogramBinDTO.setFrequency((int)frequencies[j]);
                double bin = bins[j + 1];
                histogramBinDTO.setBin(Double.toString(bin));
                histogramBinDTOList.add(histogramBinDTO);
            }
            featureDistributionDTO.setFrequencyDistribution(histogramBinDTOList);
            featureDistributionDTOS.add(featureDistributionDTO);
        }
        FeatureDistributionsDTO featureDistributionsDTO = new FeatureDistributionsDTO();
        featureDistributionsDTO.setFeatureDistributions(featureDistributionDTOS);
        return featureDistributionsDTO;
    }

    public static StatisticsDTO computeDataFrameStats(String name, SparkSession sparkSession, Dataset<Row> sparkDf, String featurestore, int version, Boolean descriptiveStatistics, Boolean featureCorrelation, Boolean featureHistograms, Boolean clusterAnalysis, List<String> statColumns, int numBins, int numClusters, String correlationMethod) throws DataframeIsEmpty, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestorePasswordNotFound, OnlineFeaturestoreNotEnabled, FeaturegroupDoesNotExistError {
        Dataset<Row> numericSparkDf;
        if (sparkDf == null) {
            sparkDf = FeaturestoreHelper.getCachedFeaturegroup(sparkSession, name, featurestore, version, false);
        }
        if (statColumns != null && !statColumns.isEmpty()) {
            List<Column> statSparkColumns = statColumns.stream().map(sc -> functions.col((String)sc)).collect(Collectors.toList());
            Column[] sparkColumnsArr = new Column[statSparkColumns.size()];
            statSparkColumns.toArray(sparkColumnsArr);
            sparkDf = sparkDf.select(sparkColumnsArr);
        }
        if (sparkDf.rdd().isEmpty()) {
            throw new DataframeIsEmpty("The provided dataframe is empty, cannot compute feature statistics on an empty dataframe");
        }
        ClusterAnalysisDTO clusterAnalysisDTO = null;
        DescriptiveStatsDTO descriptiveStatsDTO = null;
        FeatureCorrelationMatrixDTO featureCorrelationMatrixDTO = null;
        FeatureDistributionsDTO featureDistributionsDTO = null;
        if (descriptiveStatistics.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing descriptive statistics for: " + name);
                sparkSession.sparkContext().setJobGroup("Descriptive Statistics Computation", "Analyzing Dataframe Statistics for : " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                descriptiveStatsDTO = FeaturestoreHelper.computeDescriptiveStatistics((Dataset<Row>)sparkDf);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute descriptive statistics for:" + name + "set the optional argument descriptive_statistics=False to skip this step. Error: " + e.getMessage());
            }
        }
        if (featureCorrelation.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing feature correlation for: " + name);
                sparkSession.sparkContext().setJobGroup("Feature Correlation Computation", "Analyzing Feature Correlations for: " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                featureCorrelationMatrixDTO = FeaturestoreHelper.computeCorrMatrix(numericSparkDf, correlationMethod);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute feature correlation for:" + name + "set the optional argument feature_correlation=False to skip this step. Error: " + e.getMessage());
            }
        }
        if (featureHistograms.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing feature histograms for: " + name);
                sparkSession.sparkContext().setJobGroup("Feature Histogram Computation", "Analyzing Feature Distributions for: " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                featureDistributionsDTO = FeaturestoreHelper.computeFeatureHistograms(numericSparkDf, numBins);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute feature histograms for:" + name + "set the optional argument feature_histograms=False to skip this step. Error: " + e.getMessage());
            }
        }
        if (clusterAnalysis.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing cluster analysis for: " + name);
                sparkSession.sparkContext().setJobGroup("Feature Cluster Analysis", "Analyzing Feature Clusters for: " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                clusterAnalysisDTO = FeaturestoreHelper.computeClusterAnalysis(numericSparkDf, numClusters);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute cluster analysis for:" + name + "set the optional argument cluster_analysis=False to skip this step. Error: " + e.getMessage());
            }
        }
        return new StatisticsDTO(descriptiveStatsDTO, clusterAnalysisDTO, featureCorrelationMatrixDTO, featureDistributionsDTO);
    }

    public static void writeTrainingDataset(SparkSession sparkSession, Dataset<Row> sparkDf, String path, String dataFormat, String writeMode) throws TrainingDatasetFormatNotSupportedError, CannotWriteImageDataFrameException {
        sparkSession.sparkContext().setJobGroup("Materializing dataframe as training dataset", "Saving training dataset in path: " + path + ", in format: " + dataFormat, true);
        switch (dataFormat) {
            case "csv": {
                sparkDf.write().option("delimiter", ",").mode(writeMode).option("header", "true").csv(path);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "tsv": {
                sparkDf.write().option("delimiter", "\t").mode(writeMode).option("header", "true").csv(path);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "parquet": {
                sparkDf.write().format(dataFormat).mode(writeMode).parquet(path);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "avro": {
                sparkDf.write().format(dataFormat).mode(writeMode).save(path);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "orc": {
                sparkDf.write().format(dataFormat).mode(writeMode).save(path);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "image": {
                throw new CannotWriteImageDataFrameException("Image Dataframes can only be read, not written");
            }
            case "tfrecords": {
                sparkDf.write().format(dataFormat).option("recordType", "Example").mode(writeMode).save(path);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            default: {
                sparkSession.sparkContext().setJobGroup("", "", true);
                throw new TrainingDatasetFormatNotSupportedError("The provided data format: " + dataFormat + " is not supported in the Java/Scala API, the supported data formats are: " + "csv" + "," + "tsv" + "," + "tfrecords" + "," + "parquet" + "," + "avro" + "," + "orc" + "," + "image" + ",");
            }
        }
    }

    public static TrainingDatasetDTO findTrainingDataset(List<TrainingDatasetDTO> trainingDatasetDTOList, String trainingDatasetName, int trainingDatasetVersion) throws TrainingDatasetDoesNotExistError {
        List matches = trainingDatasetDTOList.stream().filter(td -> td.getName().equals(trainingDatasetName) && td.getVersion() == trainingDatasetVersion).collect(Collectors.toList());
        if (matches.isEmpty()) {
            List trainingDatasetNames = trainingDatasetDTOList.stream().map(td -> td.getName()).collect(Collectors.toList());
            throw new TrainingDatasetDoesNotExistError("Could not find the requested training dataset with name: " + trainingDatasetName + " , and version: " + trainingDatasetVersion + " , among the list of available training datasets in the featurestore: " + StringUtils.join(trainingDatasetNames, (String)","));
        }
        return (TrainingDatasetDTO)matches.get(0);
    }

    public static FeaturestoreStorageConnectorDTO findStorageConnector(List<FeaturestoreStorageConnectorDTO> storageConnectorsList, String storageConnectorName) throws StorageConnectorDoesNotExistError {
        List matches = storageConnectorsList.stream().filter(sc -> sc.getName().equals(storageConnectorName)).collect(Collectors.toList());
        if (matches.isEmpty()) {
            List storageConnectorNames = storageConnectorsList.stream().map(sc -> sc.getName()).collect(Collectors.toList());
            throw new StorageConnectorDoesNotExistError("Could not find the requested storage connector with name: " + storageConnectorName + ", among the list of available storage connectors in the featurestore: " + StringUtils.join(storageConnectorNames, (String)","));
        }
        return (FeaturestoreStorageConnectorDTO)matches.get(0);
    }

    public static FeaturegroupDTO findFeaturegroup(List<FeaturegroupDTO> featuregroupDTOList, String featuregroupName, int featuregroupVersion) throws FeaturegroupDoesNotExistError {
        List matches = featuregroupDTOList.stream().filter(td -> td.getName().equals(featuregroupName) && td.getVersion() == featuregroupVersion).collect(Collectors.toList());
        if (matches.isEmpty()) {
            List featuregroupNames = featuregroupDTOList.stream().map(td -> td.getName()).collect(Collectors.toList());
            throw new FeaturegroupDoesNotExistError("Could not find the requested feature group with name: " + featuregroupName + " , and version: " + featuregroupVersion + " , among the list of available feature groups in the featurestore: " + StringUtils.join(featuregroupNames, (String)","));
        }
        return (FeaturegroupDTO)matches.get(0);
    }

    public static void writeTfRecordSchemaJson(String hdfsPath, String tfRecordSchemaJson) throws IOException {
        Configuration hdfsConf = new Configuration();
        Path filePath = new Path(hdfsPath);
        FileSystem hdfs = filePath.getFileSystem(hdfsConf);
        try (FSDataOutputStream outputStream = hdfs.create(filePath, true);){
            outputStream.writeBytes(tfRecordSchemaJson);
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Could not save tf record schema json to HDFS", e);
        }
    }

    public static JSONObject getDataframeTfRecordSchemaJson(Dataset<Row> sparkDf) throws InferTFRecordSchemaError {
        JSONObject tfRecordJsonSchema = new JSONObject();
        for (int i = 0; i < sparkDf.schema().fields().length; ++i) {
            JSONObject featureType;
            if (sparkDf.schema().fields()[i].dataType() instanceof IntegerType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"int");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof LongType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"int");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof FloatType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"float");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DoubleType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"float");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DecimalType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"float");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof StringType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"string");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof BinaryType) {
                featureType = new JSONObject();
                featureType.put("feature", (Object)"fixed_len");
                featureType.put("type", (Object)"string");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof ArrayType) {
                JSONObject featureType2;
                Row first = (Row)sparkDf.first();
                if (first.schema().fields().length > 1) {
                    throw new InferTFRecordSchemaError("Cannot Infer TF-Record Schema for spark dataframes with more than one nested levels");
                }
                if (first.schema().fields()[i].dataType() instanceof IntegerType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"int");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof LongType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"int");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof FloatType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"float");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof DoubleType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"float");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof DecimalType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"float");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof StringType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"string");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof BinaryType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", (Object)"var_len");
                    featureType2.put("type", (Object)"string");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), (Object)featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof ArrayType) {
                    throw new InferTFRecordSchemaError("Can only infer tf record schema for dataframes with one level of nested arrays, this dataframe has two levels.");
                }
                if (!(first.schema().fields()[i].dataType() instanceof IntegerType || first.schema().fields()[i].dataType() instanceof LongType || first.schema().fields()[i].dataType() instanceof FloatType || first.schema().fields()[i].dataType() instanceof DoubleType || first.schema().fields()[i].dataType() instanceof DecimalType || first.schema().fields()[i].dataType() instanceof StringType || first.schema().fields()[i].dataType() instanceof BinaryType)) {
                    throw new InferTFRecordSchemaError("Could not infer the tf record schema, an array column has the datatype: " + first.schema().fields()[i].dataType().toString() + " which is not in the list of recognized types:  IntegerType, LongType, FloatType, DoubleType, DecimalType, StringType, and BinaryType");
                }
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof IntegerType || sparkDf.schema().fields()[i].dataType() instanceof LongType || sparkDf.schema().fields()[i].dataType() instanceof FloatType || sparkDf.schema().fields()[i].dataType() instanceof DoubleType || sparkDf.schema().fields()[i].dataType() instanceof DecimalType || sparkDf.schema().fields()[i].dataType() instanceof StringType || sparkDf.schema().fields()[i].dataType() instanceof BinaryType || sparkDf.schema().fields()[i].dataType() instanceof ArrayType) continue;
            throw new InferTFRecordSchemaError("Could not infer the tf record schema, a row has the datatype: " + sparkDf.schema().fields()[i].dataType().toString() + " which is not in the list of recognized types:  IntegerType, LongType, FloatType, DoubleType, DecimalType, StringType, BinaryType, and ArrayType");
        }
        LOG.log(Level.INFO, "Inferred TFRecordsSchema: " + tfRecordJsonSchema.toString());
        return tfRecordJsonSchema;
    }

    public static int getLatestFeaturegroupVersion(List<FeaturegroupDTO> featuregroupDTOS, String featuregroupName) {
        List matches = featuregroupDTOS.stream().filter(fg -> fg.getName().equals(featuregroupName)).collect(Collectors.toList());
        if (matches.isEmpty()) {
            return 0;
        }
        return (Integer)Collections.max(matches.stream().map(fg -> fg.getVersion()).collect(Collectors.toList()));
    }

    public static int getLatestTrainingDatasetVersion(List<TrainingDatasetDTO> trainingDatsetDTOS, String trainingDatasetName) {
        List matches = trainingDatsetDTOS.stream().filter(td -> td.getName().equals(trainingDatasetName)).collect(Collectors.toList());
        if (matches.isEmpty()) {
            return 0;
        }
        return (Integer)Collections.max(matches.stream().map(fg -> fg.getVersion()).collect(Collectors.toList()));
    }

    public static FeaturestoreMetadataDTO getFeaturestoreMetadataCache() {
        return featurestoreMetadataCache;
    }

    public static void setFeaturestoreMetadataCache(FeaturestoreMetadataDTO featurestoreMetadataCache) {
        FeaturestoreHelper.featurestoreMetadataCache = featurestoreMetadataCache;
        if (featurestoreRegex == null) {
            featurestoreRegex = Pattern.compile(featurestoreMetadataCache.getSettings().getFeaturestoreRegex());
        }
    }

    public static String featurestoreGetOrDefault(String featurestore) {
        if (featurestore == null) {
            return FeaturestoreHelper.getProjectFeaturestore();
        }
        return featurestore;
    }

    public static SparkSession sparkGetOrDefault(SparkSession sparkSession) {
        if (sparkSession == null) {
            return Hops.findSpark();
        }
        return sparkSession;
    }

    public static List<String> primaryKeyGetOrDefault(List<String> primaryKey, Dataset<Row> df) {
        if (primaryKey.isEmpty()) {
            return FeaturestoreHelper.getDefaultPrimaryKey(df);
        }
        return primaryKey;
    }

    public static String correlationMethodGetOrDefault(String corrMethod) {
        if (corrMethod == null) {
            return "pearson";
        }
        return corrMethod;
    }

    public static Integer numBinsGetOrDefault(Integer numBins) {
        if (numBins == null) {
            return 20;
        }
        return numBins;
    }

    public static Integer numClustersGetOrDefault(Integer numClusters) {
        if (numClusters == null) {
            return 5;
        }
        return numClusters;
    }

    public static String jobNameGetOrDefault(String jobName) {
        if (Strings.isNullOrEmpty((String)jobName)) {
            return Hops.getJobName();
        }
        return jobName;
    }

    public static String dataFormatGetOrDefault(String dataFormat) {
        if (dataFormat == null) {
            return "tfrecords";
        }
        return dataFormat;
    }

    public static Integer getFeaturestoreId(String featurestore) throws JAXBException, FeaturestoreNotFound {
        if (FeaturestoreHelper.getFeaturestoreMetadataCache() == null || !featurestore.equalsIgnoreCase(FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreName())) {
            new FeaturestoreUpdateMetadataCache().setFeaturestore(featurestore).write();
        }
        return FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreId();
    }

    public static Integer getFeaturegroupId(String featurestore, String featuregroup, int featuregroupVersion) throws JAXBException, FeaturestoreNotFound, FeaturegroupDoesNotExistError {
        List featuregroups;
        if (FeaturestoreHelper.getFeaturestoreMetadataCache() == null || !featurestore.equalsIgnoreCase(FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreName())) {
            new FeaturestoreUpdateMetadataCache().setFeaturestore(featurestore).write();
        }
        if ((featuregroups = FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturegroups().stream().filter(fg -> fg.getName().equalsIgnoreCase(featuregroup) && fg.getVersion() == featuregroupVersion).collect(Collectors.toList())).size() == 1) {
            return ((FeaturegroupDTO)featuregroups.get(0)).getId();
        }
        if (featuregroups.size() > 1) {
            throw new AssertionError((Object)(" Found more than one featuregroup with the name: " + featuregroup + " in the featurestore: " + featurestore));
        }
        throw new FeaturegroupDoesNotExistError("Featuregroup: " + featuregroup + " was not found in the featurestore: " + featurestore);
    }

    public static Integer getTrainingDatasetId(String featurestore, String trainingDataset, int trainingDatasetVersion) throws JAXBException, FeaturestoreNotFound, TrainingDatasetDoesNotExistError {
        List trainingDatasets;
        if (FeaturestoreHelper.getFeaturestoreMetadataCache() == null || !featurestore.equalsIgnoreCase(FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreName())) {
            new FeaturestoreUpdateMetadataCache().setFeaturestore(featurestore).write();
        }
        if ((trainingDatasets = FeaturestoreHelper.getFeaturestoreMetadataCache().getTrainingDatasets().stream().filter(td -> td.getName().equalsIgnoreCase(trainingDataset) && td.getVersion() == trainingDatasetVersion).collect(Collectors.toList())).size() == 1) {
            return ((TrainingDatasetDTO)trainingDatasets.get(0)).getId();
        }
        if (trainingDatasets.size() > 1) {
            throw new AssertionError((Object)(" Found more than one training dataset with the name: " + trainingDataset + " in the featurestore: " + featurestore));
        }
        throw new TrainingDatasetDoesNotExistError("Training Dataset: " + trainingDataset + " was not found in the featurestore: " + featurestore);
    }

    public static Boolean isHiveEnabled(SparkSession sparkSession) {
        return FeaturestoreHelper.getSparkSqlCatalogImpl(sparkSession).equalsIgnoreCase("hive");
    }

    public static String getSparkSqlCatalogImpl(SparkSession sparkSession) {
        return sparkSession.sparkContext().getConf().get("spark.sql.catalogImplementation");
    }

    public static void verifyHiveEnabled(SparkSession sparkSession) throws HiveNotEnabled {
        if (!FeaturestoreHelper.isHiveEnabled(sparkSession).booleanValue()) {
            throw new HiveNotEnabled("Hopsworks Featurestore Depends on Hive. Hive is not enabled for the current spark session. Make sure to enable hive before using the featurestore API. The current SparkSQL catalog implementation is: " + FeaturestoreHelper.getSparkSqlCatalogImpl(sparkSession) + ", it should be: " + "hive");
        }
    }

    public static String getTrainingDatasetDTOTypeStr(TrainingDatasetDTO trainingDatasetDTO, FeaturestoreClientSettingsDTO featurestoreClientSettingsDTO) {
        if (trainingDatasetDTO.getTrainingDatasetType() == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
            return featurestoreClientSettingsDTO.getHopsfsTrainingDatasetDtoType();
        }
        return featurestoreClientSettingsDTO.getExternalTrainingDatasetDtoType();
    }

    public static String getProjectTrainingDatasetsSink() {
        String projectName = Hops.getProjectName();
        return projectName + "_Training_Datasets";
    }

    public static String getFeaturegroupDtoTypeStr(FeaturestoreClientSettingsDTO featurestoreClientSettingsDTO, Boolean onDemand) {
        if (onDemand.booleanValue()) {
            return featurestoreClientSettingsDTO.getOnDemandFeaturegroupDtoType();
        }
        return featurestoreClientSettingsDTO.getCachedFeaturegroupDtoType();
    }

    public static String getFeaturegroupTypeStr(Boolean onDemand) {
        if (onDemand.booleanValue()) {
            return FeaturegroupType.ON_DEMAND_FEATURE_GROUP.name();
        }
        return FeaturegroupType.CACHED_FEATURE_GROUP.name();
    }

    public static String getHopsfsTrainingDatasetPath(HopsfsTrainingDatasetDTO hopsfsTrainingDatasetDTO) {
        return "hdfs://default" + hopsfsTrainingDatasetDTO.getHdfsStorePath() + "/" + hopsfsTrainingDatasetDTO.getName();
    }

    public static void validateDataframe(Dataset<Row> dataframe) {
        if (dataframe == null) {
            throw new IllegalArgumentException("Dataframe cannot be null, specify dataframe with .setDataframe(df)");
        }
    }

    public static void validateWriteMode(String mode) {
        if (mode == null || !mode.equalsIgnoreCase("append") && !mode.equalsIgnoreCase("overwrite")) {
            throw new IllegalArgumentException("The supplied write mode: " + mode + " does not match any of the supported modes: overwrite, append");
        }
    }

    public static void registerCustomJdbcDialects() {
        JdbcDialects.registerDialect((JdbcDialect)FeaturestoreHelper.getHiveJdbcDialect());
    }

    public static JdbcDialect getHiveJdbcDialect() {
        JdbcDialect hiveDialect = new JdbcDialect(){

            public boolean canHandle(String url) {
                return url.startsWith("jdbc:hive2") || url.contains("hive2");
            }

            public String quoteIdentifier(String colName) {
                return colName;
            }
        };
        return hiveDialect;
    }

    public static Seq<String> convertListToSeq(List<String> inputList) {
        return ((Iterator)JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala()).toSeq();
    }

    public static void registerOnDemandFeaturegroupsAsTempTables(List<FeaturegroupDTO> onDemandFeaturegroups, String featurestore, Map<String, Map<String, String>> jdbcArguments) throws FeaturegroupDoesNotExistError, HiveNotEnabled, StorageConnectorDoesNotExistError, OnlineFeaturestorePasswordNotFound, FeaturestoreNotFound, OnlineFeaturestoreUserNotFound, JAXBException, OnlineFeaturestoreNotEnabled {
        for (FeaturegroupDTO onDemandFeaturegroup : onDemandFeaturegroups) {
            FeaturestoreReadFeaturegroup readFeaturegroupOp = new FeaturestoreReadFeaturegroup(onDemandFeaturegroup.getName()).setVersion(onDemandFeaturegroup.getVersion()).setFeaturestore(featurestore);
            if (jdbcArguments != null && jdbcArguments.containsKey(FeaturestoreHelper.getTableName(onDemandFeaturegroup.getName(), onDemandFeaturegroup.getVersion()))) {
                readFeaturegroupOp.setJdbcArguments(jdbcArguments.get(FeaturestoreHelper.getTableName(onDemandFeaturegroup.getName(), onDemandFeaturegroup.getVersion())));
            }
            Dataset<Row> sparkDf = readFeaturegroupOp.read();
            sparkDf.registerTempTable(FeaturestoreHelper.getTableName(onDemandFeaturegroup.getName(), onDemandFeaturegroup.getVersion()));
            LOG.info("Registered On-Demand Feature Group: " + onDemandFeaturegroup.getName() + " with version: " + onDemandFeaturegroup.getVersion() + " as temporary table: " + FeaturestoreHelper.getTableName(onDemandFeaturegroup.getName(), onDemandFeaturegroup.getVersion()));
        }
    }

    public static String getExternalTrainingDatasetPath(String trainingDatasetName, int trainingDatasetVersion, String bucket) {
        String path = "";
        if (!path.contains("s3a://")) {
            path = path + "s3a://";
        }
        path = path + bucket + "/" + "TRAINING_DATASETS" + "/" + FeaturestoreHelper.getTableName(trainingDatasetName, trainingDatasetVersion);
        return path;
    }

    public static void setupS3CredentialsForSpark(String accessKey, String secretKey, SparkSession sparkSession) {
        SparkContext sparkContext = sparkSession.sparkContext();
        sparkContext.hadoopConfiguration().set("fs.s3a.access.key", accessKey);
        sparkContext.hadoopConfiguration().set("fs.s3a.secret.key", secretKey);
    }

    public static String getBucketPath(String bucket, String datasetPath) {
        if (datasetPath.contains(bucket)) {
            if (!datasetPath.contains("s3a://")) {
                return "s3a://" + datasetPath;
            }
            return datasetPath;
        }
        String path = "";
        if (!path.contains("s3a://")) {
            path = path + "s3a://";
        }
        path = path + bucket + "/" + datasetPath;
        return path;
    }

    public static String getOnlineFeaturestorePassword(FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO) throws OnlineFeaturestorePasswordNotFound {
        String[] args = featurestoreJdbcConnectorDTO.getArguments().split(",");
        for (int i = 0; i < args.length; ++i) {
            if (!args[i].contains("password=")) continue;
            return args[i].replace("password=", "");
        }
        throw new OnlineFeaturestorePasswordNotFound("Could not find any password in the storage connector");
    }

    public static String getOnlineFeaturestoreUser(FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO) throws OnlineFeaturestoreUserNotFound {
        String[] args = featurestoreJdbcConnectorDTO.getArguments().split(",");
        for (int i = 0; i < args.length; ++i) {
            if (!args[i].contains("user=")) continue;
            return args[i].replace("user=", "");
        }
        throw new OnlineFeaturestoreUserNotFound("Could not find any username in the storage connector");
    }

    public static void writeJdbcDataframe(Dataset<Row> sparkDf, FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO, String tableName, String mode) throws OnlineFeaturestorePasswordNotFound, OnlineFeaturestoreUserNotFound {
        String pw = FeaturestoreHelper.getOnlineFeaturestorePassword(featurestoreJdbcConnectorDTO);
        String user = FeaturestoreHelper.getOnlineFeaturestoreUser(featurestoreJdbcConnectorDTO);
        sparkDf.write().format("jdbc").option("url", featurestoreJdbcConnectorDTO.getConnectionString()).option("dbtable", tableName).option("user", user).option("password", pw).mode(mode).save();
    }

    public static Dataset<Row> readJdbcDataFrame(SparkSession sparkSession, FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO, String query) throws OnlineFeaturestorePasswordNotFound, OnlineFeaturestoreUserNotFound {
        String pw = FeaturestoreHelper.getOnlineFeaturestorePassword(featurestoreJdbcConnectorDTO);
        String user = FeaturestoreHelper.getOnlineFeaturestoreUser(featurestoreJdbcConnectorDTO);
        return sparkSession.read().format("jdbc").option("url", featurestoreJdbcConnectorDTO.getConnectionString()).option("dbtable", query).option("user", user).option("password", pw).load();
    }

    static {
        featurestoreMetadataCache = null;
        featurestoreRegex = null;
        try {
            descriptiveStatsJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{DescriptiveStatsDTO.class}, null);
            featureCorrelationJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{FeatureCorrelationMatrixDTO.class}, null);
            featureHistogramsJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{FeatureDistributionsDTO.class}, null);
            clusterAnalysisJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{ClusterAnalysisDTO.class}, null);
            featureJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{FeatureDTO.class}, null);
            featurestoreMetadataJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{FeaturestoreMetadataDTO.class}, null);
            trainingDatasetJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{TrainingDatasetDTO.class}, null);
            featuregroupJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{FeaturegroupDTO.class}, null);
            jdbcConnectorJAXBContext = JAXBContextFactory.createContext((Class[])new Class[]{FeaturestoreJdbcConnectorDTO.class}, null);
            descriptiveStatsMarshaller = descriptiveStatsJAXBContext.createMarshaller();
            descriptiveStatsMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            descriptiveStatsMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featureCorrelationMarshaller = featureCorrelationJAXBContext.createMarshaller();
            featureCorrelationMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featureCorrelationMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            clusteranalysisMarshaller = clusterAnalysisJAXBContext.createMarshaller();
            clusteranalysisMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            clusteranalysisMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featureHistogramsMarshaller = featureHistogramsJAXBContext.createMarshaller();
            featureHistogramsMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featureHistogramsMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featureMarshaller = featureJAXBContext.createMarshaller();
            featureMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featureMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featurestoreMetadataMarshaller = featurestoreMetadataJAXBContext.createMarshaller();
            featurestoreMetadataMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featurestoreMetadataMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            trainingDatasetMarshaller = trainingDatasetJAXBContext.createMarshaller();
            trainingDatasetMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            trainingDatasetMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featuregroupMarshaller = featuregroupJAXBContext.createMarshaller();
            featuregroupMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featuregroupMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            jdbcConnectorMarshaller = jdbcConnectorJAXBContext.createMarshaller();
            jdbcConnectorMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            jdbcConnectorMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
        }
        catch (JAXBException e) {
            LOG.log(Level.SEVERE, "An error occurred while initializing JAXBContext", e);
        }
    }
}

