/*
 * Decompiled with CFR 0.152.
 */
package io.hops.util.featurestore;

import com.google.common.base.Strings;
import io.hops.util.Constants;
import io.hops.util.Hops;
import io.hops.util.exceptions.CannotWriteImageDataFrameException;
import io.hops.util.exceptions.DataframeIsEmpty;
import io.hops.util.exceptions.FeaturegroupDoesNotExistError;
import io.hops.util.exceptions.FeaturestoreNotFound;
import io.hops.util.exceptions.InferTFRecordSchemaError;
import io.hops.util.exceptions.InvalidPrimaryKeyForFeaturegroup;
import io.hops.util.exceptions.SparkDataTypeNotRecognizedError;
import io.hops.util.exceptions.TrainingDatasetDoesNotExistError;
import io.hops.util.exceptions.TrainingDatasetFormatNotSupportedError;
import io.hops.util.featurestore.dtos.FeatureDTO;
import io.hops.util.featurestore.dtos.FeaturegroupDTO;
import io.hops.util.featurestore.dtos.FeaturestoreMetadataDTO;
import io.hops.util.featurestore.dtos.SQLJoinDTO;
import io.hops.util.featurestore.dtos.TrainingDatasetDTO;
import io.hops.util.featurestore.dtos.stats.StatisticsDTO;
import io.hops.util.featurestore.dtos.stats.cluster_analysis.ClusterAnalysisDTO;
import io.hops.util.featurestore.dtos.stats.cluster_analysis.ClusterDTO;
import io.hops.util.featurestore.dtos.stats.cluster_analysis.DatapointDTO;
import io.hops.util.featurestore.dtos.stats.desc_stats.DescriptiveStatsDTO;
import io.hops.util.featurestore.dtos.stats.desc_stats.DescriptiveStatsMetricValueDTO;
import io.hops.util.featurestore.dtos.stats.desc_stats.DescriptiveStatsMetricValuesDTO;
import io.hops.util.featurestore.dtos.stats.feature_correlation.CorrelationValueDTO;
import io.hops.util.featurestore.dtos.stats.feature_correlation.FeatureCorrelationDTO;
import io.hops.util.featurestore.dtos.stats.feature_correlation.FeatureCorrelationMatrixDTO;
import io.hops.util.featurestore.dtos.stats.feature_distributions.FeatureDistributionDTO;
import io.hops.util.featurestore.dtos.stats.feature_distributions.FeatureDistributionsDTO;
import io.hops.util.featurestore.dtos.stats.feature_distributions.HistogramBinDTO;
import io.hops.util.featurestore.ops.write_ops.FeaturestoreUpdateMetadataCache;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.DenseMatrix;
import org.apache.spark.ml.stat.Correlation;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.NumericType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Tuple2;

public class FeaturestoreHelper {
    private static final Logger LOG = Logger.getLogger(FeaturestoreHelper.class.getName());
    private static JAXBContext descriptiveStatsJAXBContext;
    private static JAXBContext featureCorrelationJAXBContext;
    private static JAXBContext featureHistogramsJAXBContext;
    private static JAXBContext clusterAnalysisJAXBContext;
    private static JAXBContext featureJAXBContext;
    private static JAXBContext featurestoreMetadataJAXBContext;
    private static JAXBContext trainingDatasetJAXBContext;
    private static Marshaller descriptiveStatsMarshaller;
    private static Marshaller featureCorrelationMarshaller;
    private static Marshaller featureHistogramsMarshaller;
    private static Marshaller clusteranalysisMarshaller;
    private static Marshaller featureMarshaller;
    private static Marshaller featurestoreMetadataMarshaller;
    private static Marshaller trainingDatasetMarshaller;
    private static FeaturestoreMetadataDTO featurestoreMetadataCache;

    private FeaturestoreHelper() {
    }

    public static String getProjectFeaturestore() {
        String projectName = Hops.getProjectName();
        return projectName.toLowerCase() + "_featurestore";
    }

    public static void useFeaturestore(SparkSession sparkSession, String featurestore) {
        if (featurestore == null) {
            featurestore = FeaturestoreHelper.getProjectFeaturestore();
        }
        String sqlStr = "use " + featurestore;
        FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    public static String getTableName(String featuregroupName, int version) {
        return featuregroupName + "_" + version;
    }

    public static void insertIntoFeaturegroup(Dataset<Row> sparkDf, SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String tableName = FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        SparkContext sc = sparkSession.sparkContext();
        SQLContext sqlContext = new SQLContext(sc);
        sqlContext.setConf("hive.exec.dynamic.partition", "true");
        sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");
        String mode = "append";
        String format2 = "hive";
        sparkDf.write().format(format2).mode(mode).insertInto(tableName);
    }

    private static List<FeaturegroupDTO> findFeaturegroupThatContainsFeature(List<FeaturegroupDTO> featuregroups, String feature) {
        ArrayList<FeaturegroupDTO> matches2 = new ArrayList<FeaturegroupDTO>();
        block0: for (FeaturegroupDTO featuregroupDTO : featuregroups) {
            for (FeatureDTO featureDTO : featuregroupDTO.getFeatures()) {
                String fullName = FeaturestoreHelper.getTableName(featuregroupDTO.getName(), featuregroupDTO.getVersion()) + "." + featureDTO.getName();
                if (!featureDTO.getName().equals(feature) && !fullName.equals(feature)) continue;
                matches2.add(featuregroupDTO);
                continue block0;
            }
        }
        return matches2;
    }

    public static List<FeaturegroupDTO> findFeaturegroupsThatContainsFeatures(List<FeaturegroupDTO> featuregroups, List<String> features, String featurestore) {
        HashSet<FeaturegroupDTO> featureFeaturegroupsSet = new HashSet<FeaturegroupDTO>();
        for (String feature : features) {
            FeaturegroupDTO featuregroupDTO = FeaturestoreHelper.findFeature(feature, featurestore, featuregroups);
            featureFeaturegroupsSet.add(featuregroupDTO);
        }
        ArrayList<FeaturegroupDTO> featureFeaturegroups = new ArrayList<FeaturegroupDTO>();
        featureFeaturegroups.addAll(featureFeaturegroupsSet);
        return featureFeaturegroups;
    }

    public static Dataset<Row> getFeaturegroup(SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String sqlStr = "SELECT * FROM " + FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    public static Dataset<Row> getFeaturegroupPartitions(SparkSession sparkSession, String featuregroup, String featurestore, int featuregroupVersion) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String sqlStr = "SHOW PARTITIONS " + FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    public static Dataset<Row> getTrainingDataset(SparkSession sparkSession, String dataFormat, String hdfsPath) throws TrainingDatasetFormatNotSupportedError, IOException, TrainingDatasetDoesNotExistError {
        Configuration hdfsConf = new Configuration();
        Path filePath = null;
        FileSystem hdfs = null;
        switch (dataFormat) {
            case "csv": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", ",").load(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".csv");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", ",").load(hdfsPath + ".csv");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".csv");
            }
            case "tsv": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", "\t").load(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".tsv");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).option("header", "true").option("delimiter", "\t").load(hdfsPath + ".tsv");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".tsv");
            }
            case "parquet": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().parquet(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".parquet");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().parquet(hdfsPath + ".parquet");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".parquet");
            }
            case "avro": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).load(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".avro");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).load(hdfsPath + ".avro");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".avro");
            }
            case "orc": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).load(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".orc");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).load(hdfsPath + ".orc");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".orc");
            }
            case "image": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).load(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".image");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).load(hdfsPath + ".image");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".image");
            }
            case "tfrecords": {
                filePath = new Path(hdfsPath);
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).option("recordType", "Example").load(hdfsPath);
                }
                filePath = new Path(hdfsPath + ".tfrecords");
                hdfs = filePath.getFileSystem(hdfsConf);
                if (hdfs.exists(filePath)) {
                    return sparkSession.read().format(dataFormat).option("recordType", "Example").load(hdfsPath + ".tfrecords");
                }
                throw new TrainingDatasetDoesNotExistError("Could not find any training dataset in folder : " + hdfsPath + " or in file: " + hdfsPath + ".tfrecords");
            }
        }
        throw new TrainingDatasetFormatNotSupportedError("The provided data format: " + dataFormat + " is not supported in the Java/Scala API, the supported data formats are: " + "csv" + "," + "tsv" + "," + "tfrecords" + "," + "parquet" + "," + "avro" + "," + "orc" + "," + "image" + ",");
    }

    private static FeaturegroupDTO findFeature(String feature, String featurestore, List<FeaturegroupDTO> featuregroupDTOS) {
        List<FeaturegroupDTO> matchedFeaturegroups = FeaturestoreHelper.findFeaturegroupThatContainsFeature(featuregroupDTOS, feature);
        if (matchedFeaturegroups.isEmpty()) {
            throw new IllegalArgumentException("Could not find the feature with name: " + feature + " in any of the featuregroups of the featurestore: " + featurestore);
        }
        if (matchedFeaturegroups.size() > 1) {
            List featuregroupStrings = featuregroupDTOS.stream().map(fg -> FeaturestoreHelper.getTableName(fg.getName(), fg.getVersion())).collect(Collectors.toList());
            String featuregroupsStr = StringUtils.join(featuregroupStrings, (String)", ");
            throw new IllegalArgumentException("Found the feature with name: " + feature + " in more than one of the featuregroups of the featurestore " + featurestore + " please specify featuregroup that you want to get the feature from. The matched featuregroups are: " + featuregroupsStr);
        }
        return matchedFeaturegroups.get(0);
    }

    public static Dataset<Row> getFeature(SparkSession sparkSession, String feature, String featurestore, List<FeaturegroupDTO> featuregroupDTOS) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        FeaturegroupDTO matchedFeaturegroup = FeaturestoreHelper.findFeature(feature, featurestore, featuregroupDTOS);
        String sqlStr = "SELECT " + feature + " FROM " + FeaturestoreHelper.getTableName(matchedFeaturegroup.getName(), matchedFeaturegroup.getVersion());
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    public static Dataset<Row> getFeature(SparkSession sparkSession, String feature, String featurestore, String featuregroup, int featuregroupVersion) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String sqlStr = "SELECT " + feature + " FROM " + FeaturestoreHelper.getTableName(featuregroup, featuregroupVersion);
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    private static SQLJoinDTO getJoinStr(List<FeaturegroupDTO> featuregroupDTOS, String joinKey) {
        int i;
        StringBuilder stringBuilder = new StringBuilder();
        for (i = 1; i < featuregroupDTOS.size(); ++i) {
            stringBuilder.append("JOIN " + FeaturestoreHelper.getTableName(featuregroupDTOS.get(i).getName(), featuregroupDTOS.get(i).getVersion()));
            stringBuilder.append(" ");
        }
        stringBuilder.append("ON ");
        for (i = 0; i < featuregroupDTOS.size(); ++i) {
            if (i != 0 && i < featuregroupDTOS.size() - 1) {
                stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(0).getName(), featuregroupDTOS.get(0).getVersion()));
                stringBuilder.append(".`");
                stringBuilder.append(joinKey);
                stringBuilder.append("`=");
                stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(i).getName(), featuregroupDTOS.get(i).getVersion()));
                stringBuilder.append(".`");
                stringBuilder.append(joinKey);
                stringBuilder.append("` AND ");
            }
            if (i == 0 || i != featuregroupDTOS.size() - 1) continue;
            stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(0).getName(), featuregroupDTOS.get(0).getVersion()));
            stringBuilder.append(".`");
            stringBuilder.append(joinKey);
            stringBuilder.append("`=");
            stringBuilder.append(FeaturestoreHelper.getTableName(featuregroupDTOS.get(i).getName(), featuregroupDTOS.get(i).getVersion()));
            stringBuilder.append(".`");
            stringBuilder.append(joinKey);
            stringBuilder.append("`");
        }
        return new SQLJoinDTO(stringBuilder.toString(), featuregroupDTOS);
    }

    private static String getColumnThatIsPrimary(String[] commonCols, List<FeaturegroupDTO> featuregroupDTOS) {
        int[] primaryCounts = new int[commonCols.length];
        for (int i = 0; i < commonCols.length; ++i) {
            int primaryCount = 0;
            for (FeaturegroupDTO featuregroupDTO : featuregroupDTOS) {
                for (FeatureDTO featureDTO : featuregroupDTO.getFeatures()) {
                    if (!featureDTO.getName().equalsIgnoreCase(commonCols[i]) || !featureDTO.getPrimary().booleanValue()) continue;
                    ++primaryCount;
                }
            }
            primaryCounts[i] = primaryCount;
        }
        int maxPrimaryCount = 0;
        int maxPrimaryCountIndex = 0;
        for (int i = 0; i < primaryCounts.length; ++i) {
            if (primaryCounts[i] <= maxPrimaryCount) continue;
            maxPrimaryCount = primaryCounts[i];
            maxPrimaryCountIndex = i;
        }
        return commonCols[maxPrimaryCountIndex];
    }

    public static String getJoinColumn(List<FeaturegroupDTO> featuregroupDTOS) {
        ArrayList featureSets = new ArrayList();
        HashSet completeFeatureSet = new HashSet();
        for (FeaturegroupDTO featuregroupDTO : featuregroupDTOS) {
            List columnNames = featuregroupDTO.getFeatures().stream().map(feature -> feature.getName()).collect(Collectors.toList());
            HashSet featureSet = new HashSet(columnNames);
            completeFeatureSet.addAll(columnNames);
            featureSets.add(featureSet);
        }
        for (Set set2 : featureSets) {
            completeFeatureSet.retainAll(set2);
        }
        if (completeFeatureSet.isEmpty()) {
            List featuregroupStrings = featuregroupDTOS.stream().map(featuregroup -> featuregroup.getName()).collect(Collectors.toList());
            String string2 = StringUtils.join(featuregroupStrings, (String)", ");
            throw new IllegalArgumentException("Could not find any common columns in featuregroups to join on, searched through the following featuregroups: " + string2);
        }
        return FeaturestoreHelper.getColumnThatIsPrimary(completeFeatureSet.toArray(new String[completeFeatureSet.size()]), featuregroupDTOS);
    }

    private static List<FeaturegroupDTO> convertFeaturegroupAndVersionToDTOs(Map<String, Integer> featuregroupsAndVersions) {
        ArrayList<FeaturegroupDTO> featuregroupDTOs = new ArrayList<FeaturegroupDTO>();
        for (Map.Entry<String, Integer> entry2 : featuregroupsAndVersions.entrySet()) {
            FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO();
            featuregroupDTO.setName(entry2.getKey());
            featuregroupDTO.setVersion(entry2.getValue());
            featuregroupDTOs.add(featuregroupDTO);
        }
        return featuregroupDTOs;
    }

    public static Dataset<Row> getFeatures(SparkSession sparkSession, List<String> features, String featurestore, Map<String, Integer> featuregroupsAndVersions, String joinKey) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        List<FeaturegroupDTO> featuregroupDTOs = FeaturestoreHelper.convertFeaturegroupAndVersionToDTOs(featuregroupsAndVersions);
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String featuresStr = StringUtils.join(features, (String)", ");
        ArrayList<String> featuregroupStrings = new ArrayList<String>();
        for (Map.Entry<String, Integer> entry2 : featuregroupsAndVersions.entrySet()) {
            featuregroupStrings.add(FeaturestoreHelper.getTableName(entry2.getKey(), entry2.getValue()));
        }
        String featuregroupStr = StringUtils.join(featuregroupStrings, (String)", ");
        if (featuregroupsAndVersions.size() == 1) {
            String sqlStr = "SELECT " + featuresStr + " FROM " + featuregroupStr;
            return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
        }
        SQLJoinDTO sqlJoinDTO = FeaturestoreHelper.getJoinStr(featuregroupDTOs, joinKey);
        String sqlStr = "SELECT " + featuresStr + " FROM " + FeaturestoreHelper.getTableName(sqlJoinDTO.getFeaturegroupDTOS().get(0).getName(), sqlJoinDTO.getFeaturegroupDTOS().get(0).getVersion()) + " " + sqlJoinDTO.getJoinStr();
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    public static Dataset<Row> getFeatures(SparkSession sparkSession, List<String> features, String featurestore, List<FeaturegroupDTO> featuregroupsMetadata, String joinKey) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        String featuresStr = StringUtils.join(features, (String)", ");
        ArrayList<FeaturegroupDTO> featureFeatureGroups = new ArrayList<FeaturegroupDTO>();
        HashMap<String, FeaturegroupDTO> featuresToFeaturegroup = new HashMap<String, FeaturegroupDTO>();
        for (String feature : features) {
            FeaturegroupDTO featuregroupMatched = FeaturestoreHelper.findFeature(feature, featurestore, featuregroupsMetadata);
            featuresToFeaturegroup.put(feature, featuregroupMatched);
            featureFeatureGroups.add(featuregroupMatched);
        }
        List featuregroupStrings = featuregroupsMetadata.stream().map(fg -> FeaturestoreHelper.getTableName(fg.getName(), fg.getVersion())).collect(Collectors.toList());
        String featuregroupStr = StringUtils.join(featuregroupStrings, (String)", ");
        if (featuregroupsMetadata.size() == 1) {
            String sqlStr = "SELECT " + featuresStr + " FROM " + featuregroupStr;
            return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
        }
        SQLJoinDTO sqlJoinDTO = FeaturestoreHelper.getJoinStr(featuregroupsMetadata, joinKey);
        String sqlStr = "SELECT " + featuresStr + " FROM " + FeaturestoreHelper.getTableName(sqlJoinDTO.getFeaturegroupDTOS().get(0).getName(), sqlJoinDTO.getFeaturegroupDTOS().get(0).getVersion()) + " " + sqlJoinDTO.getJoinStr();
        return FeaturestoreHelper.logAndRunSQL(sparkSession, sqlStr);
    }

    public static Dataset<Row> queryFeaturestore(SparkSession sparkSession, String query, String featurestore) {
        FeaturestoreHelper.useFeaturestore(sparkSession, featurestore);
        return FeaturestoreHelper.logAndRunSQL(sparkSession, query);
    }

    private static Dataset<Row> logAndRunSQL(SparkSession sparkSession, String sqlStr) {
        LOG.log(Level.INFO, "Running sql: " + sqlStr);
        return sparkSession.sql(sqlStr);
    }

    public static List<FeaturegroupDTO> filterFeaturegroupsBasedOnMap(Map<String, Integer> featuregroupsAndVersions, List<FeaturegroupDTO> featuregroupsMetadata) {
        return featuregroupsMetadata.stream().filter(fgm -> featuregroupsAndVersions.get(fgm.getName()) != null && fgm.getVersion().equals(featuregroupsAndVersions.get(fgm.getName()))).collect(Collectors.toList());
    }

    private static FeatureDTO convertFieldToFeature(StructField field2, String primaryKey, List<String> partitionBy) {
        String featureName = field2.name();
        String featureType = field2.dataType().catalogString();
        String featureDesc = "";
        Boolean primary = false;
        if (primaryKey != null && featureName.equalsIgnoreCase(primaryKey)) {
            primary = true;
        }
        if (field2.metadata() != null && field2.metadata().contains("description")) {
            featureDesc = field2.metadata().getString("description");
        }
        if (field2.metadata() != null && !field2.metadata().contains("description") && field2.getComment().isDefined()) {
            featureDesc = (String)field2.getComment().get();
        }
        if (featureDesc.isEmpty()) {
            featureDesc = "-";
        }
        Boolean partition2 = false;
        if (partitionBy != null && partitionBy.contains(featureName)) {
            partition2 = true;
        }
        return new FeatureDTO(featureName, featureType, featureDesc, primary, partition2);
    }

    public static List<FeatureDTO> parseSparkFeaturesSchema(StructType sparkSchema, String primaryKey, List<String> partitionBy) {
        StructField[] fieldsList = sparkSchema.fields();
        ArrayList<FeatureDTO> features = new ArrayList<FeatureDTO>();
        for (int i = 0; i < fieldsList.length; ++i) {
            features.add(FeaturestoreHelper.convertFieldToFeature(fieldsList[i], primaryKey, partitionBy));
        }
        return features;
    }

    public static String getDefaultPrimaryKey(Dataset<Row> featuregroupDf) {
        return (String)featuregroupDf.dtypes()[0]._1;
    }

    public static Boolean validatePrimaryKey(Dataset<Row> featuregroupDf, String primaryKey) throws InvalidPrimaryKeyForFeaturegroup {
        ArrayList<String> columns = new ArrayList<String>();
        for (int i = 0; i < featuregroupDf.dtypes().length; ++i) {
            columns.add(((String)featuregroupDf.dtypes()[i]._1).toLowerCase());
        }
        if (columns.contains(primaryKey.toLowerCase())) {
            return true;
        }
        throw new InvalidPrimaryKeyForFeaturegroup("Invalid primary Key: " + primaryKey + ", the specified primary key does not exist among the available columns: " + StringUtils.join((Object[])new Object[]{",", columns}));
    }

    public static void validateMetadata(String name, Tuple2<String, String>[] dtypes, List<String> dependencies, String description) {
        Pattern namePattern = Pattern.compile("^[a-zA-Z0-9_]+$");
        if (name.length() > 256 || name.equals("") || !namePattern.matcher(name).matches()) {
            throw new IllegalArgumentException("Name of feature group/training dataset cannot be empty, cannot exceed 256 characters, cannot contain hyphens ('-') and must match the regular expression: ^[a-zA-Z0-9_]+$ the provided name: " + name + " is not valid");
        }
        if (dtypes.length == 0) {
            throw new IllegalArgumentException("Cannot create a feature group from an empty spark dataframe");
        }
        for (int i = 0; i < dtypes.length; ++i) {
            if (((String)dtypes[i]._1).length() <= 767 && !((String)dtypes[i]._1).equals("") && namePattern.matcher((CharSequence)dtypes[i]._1).matches()) continue;
            throw new IllegalArgumentException("Name of feature column cannot be empty, cannot exceed 767 characters,, cannot contains hyphens ('-'), and must match the regular expression: ^[a-zA-Z0-9_]+$, the provided feature name: " + (String)dtypes[i]._1 + " is not valid");
        }
        if (new HashSet<String>(dependencies).size() != dependencies.size()) {
            String dependenciesStr = StringUtils.join(dependencies, (String)",");
            throw new IllegalArgumentException("The list of data dependencies contains duplicates: " + dependenciesStr);
        }
        if (description.length() > 2000) {
            throw new IllegalArgumentException("Feature group/Training dataset description should not exceed the maximum length of 2000 characters, the provided description has length:" + dependencies.size());
        }
    }

    public static JSONArray convertFeatureDTOsToJsonObjects(List<FeatureDTO> featureDTOS) throws JAXBException {
        JSONArray features = new JSONArray();
        for (FeatureDTO featureDTO : featureDTOS) {
            features.put(FeaturestoreHelper.dtoToJson(featureMarshaller, featureDTO));
        }
        return features;
    }

    public static JSONObject dtoToJson(Marshaller marshaller, Object object) throws JAXBException {
        if (object == null) {
            return null;
        }
        StringWriter sw = new StringWriter();
        marshaller.marshal(object, (Writer)sw);
        return new JSONObject(sw.toString());
    }

    public static Unmarshaller getUnmarshaller(JAXBContext jaxbContext) throws JAXBException {
        Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
        unmarshaller.setProperty("eclipselink.json.include-root", (Object)false);
        unmarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
        return unmarshaller;
    }

    public static JSONObject convertFeatureCorrelationMatrixDTOToJsonObject(FeatureCorrelationMatrixDTO featureCorrelationMatrixDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(featureCorrelationMarshaller, featureCorrelationMatrixDTO);
    }

    public static JSONObject convertDescriptiveStatsDTOToJsonObject(DescriptiveStatsDTO descriptiveStatsDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(descriptiveStatsMarshaller, descriptiveStatsDTO);
    }

    public static JSONObject convertFeatureDistributionsDTOToJsonObject(FeatureDistributionsDTO featureDistributionsDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(featureHistogramsMarshaller, featureDistributionsDTO);
    }

    public static JSONObject convertClusterAnalysisDTOToJsonObject(ClusterAnalysisDTO clusterAnalysisDTO) throws JAXBException {
        return FeaturestoreHelper.dtoToJson(clusteranalysisMarshaller, clusterAnalysisDTO);
    }

    public static FeaturestoreMetadataDTO parseFeaturestoreMetadataJson(JSONObject featurestoreMetadata) throws JAXBException {
        Unmarshaller unmarshaller = FeaturestoreHelper.getUnmarshaller(featurestoreMetadataJAXBContext);
        StreamSource json = new StreamSource(new StringReader(featurestoreMetadata.toString()));
        return (FeaturestoreMetadataDTO)unmarshaller.unmarshal((Source)json, FeaturestoreMetadataDTO.class).getValue();
    }

    public static TrainingDatasetDTO parseTrainingDatasetJson(JSONObject trainingDatasetJson) throws JAXBException {
        Unmarshaller unmarshaller = FeaturestoreHelper.getUnmarshaller(trainingDatasetJAXBContext);
        StreamSource json = new StreamSource(new StringReader(trainingDatasetJson.toString()));
        return (TrainingDatasetDTO)unmarshaller.unmarshal((Source)json, TrainingDatasetDTO.class).getValue();
    }

    private static Dataset<Row> filterSparkDfNumeric(Dataset<Row> sparkDf) {
        StructField[] fields = sparkDf.schema().fields();
        ArrayList<Column> numericColumnNames = new ArrayList<Column>();
        for (int i = 0; i < fields.length; ++i) {
            if (!(fields[i].dataType() instanceof NumericType)) continue;
            numericColumnNames.add(functions.col((String)fields[i].name()));
        }
        Column[] numericColumnNamesArr = new Column[numericColumnNames.size()];
        numericColumnNames.toArray(numericColumnNamesArr);
        return sparkDf.select(numericColumnNamesArr);
    }

    private static DescriptiveStatsDTO computeDescriptiveStatistics(Dataset<Row> sparkDf) {
        String[] rawStatsArray = (String[])sparkDf.describe(new String[0]).toJSON().collect();
        ArrayList<Object> rawStatsObjects = new ArrayList<Object>();
        HashSet<String> columnNames = new HashSet<String>();
        for (int i = 0; i < rawStatsArray.length; ++i) {
            JSONObject rawStatObj = new JSONObject(rawStatsArray[i]);
            rawStatsObjects.add(rawStatObj);
            columnNames.addAll(rawStatObj.keySet());
        }
        ArrayList<DescriptiveStatsMetricValuesDTO> descriptiveStatsMetricValuesDTOList = new ArrayList<DescriptiveStatsMetricValuesDTO>();
        for (String colName : columnNames) {
            if (colName.equals("summary")) continue;
            DescriptiveStatsMetricValuesDTO descriptiveStatsMetricValuesDTO = new DescriptiveStatsMetricValuesDTO();
            ArrayList<DescriptiveStatsMetricValueDTO> descriptiveStatsMetricValueDTOList = new ArrayList<DescriptiveStatsMetricValueDTO>();
            for (int i = 0; i < rawStatsObjects.size(); ++i) {
                Float value2;
                JSONObject rawStatObj = (JSONObject)rawStatsObjects.get(i);
                if (!rawStatObj.has(colName)) continue;
                try {
                    value2 = Float.valueOf(Float.parseFloat(rawStatObj.getString(colName)));
                    if (value2.isNaN() || value2.isInfinite()) {
                        value2 = null;
                    }
                }
                catch (NullPointerException | NumberFormatException e) {
                    value2 = null;
                }
                DescriptiveStatsMetricValueDTO descriptiveStatsMetricValueDTO = new DescriptiveStatsMetricValueDTO();
                descriptiveStatsMetricValueDTO.setValue(value2);
                descriptiveStatsMetricValueDTO.setMetricName(rawStatObj.getString("summary"));
                descriptiveStatsMetricValueDTOList.add(descriptiveStatsMetricValueDTO);
            }
            descriptiveStatsMetricValuesDTO.setFeatureName(colName);
            descriptiveStatsMetricValuesDTO.setMetricValues(descriptiveStatsMetricValueDTOList);
            descriptiveStatsMetricValuesDTOList.add(descriptiveStatsMetricValuesDTO);
        }
        DescriptiveStatsDTO descriptiveStatsDTO = new DescriptiveStatsDTO();
        descriptiveStatsDTO.setDescriptiveStats(descriptiveStatsMetricValuesDTOList);
        return descriptiveStatsDTO;
    }

    private static ClusterAnalysisDTO computeClusterAnalysis(Dataset<Row> sparkDf, int clusters) {
        Dataset<Row> sparkDf1 = FeaturestoreHelper.assembleColumnsIntoVector(sparkDf, "featurestore_feature_clustering_analysis_input_col");
        KMeans kmeans = new KMeans();
        kmeans.setK(clusters);
        kmeans.setSeed(1L);
        kmeans.setMaxIter(20);
        kmeans.setFeaturesCol("featurestore_feature_clustering_analysis_input_col");
        kmeans.setPredictionCol("featurestore_feature_clustering_analysis_output_col");
        KMeansModel kMeansModel = kmeans.fit(sparkDf1.select("featurestore_feature_clustering_analysis_input_col", new String[0]));
        Dataset sparkDf2 = kMeansModel.transform(sparkDf1);
        Column[] cols = new Column[]{functions.col((String)"featurestore_feature_clustering_analysis_input_col"), functions.col((String)"featurestore_feature_clustering_analysis_output_col")};
        Dataset sparkDf3 = sparkDf2.select(cols);
        Long count2 = sparkDf3.count();
        Dataset sparkDf4 = count2 < 50L ? sparkDf3 : sparkDf3.sample(true, (double)(50.0f / (float)count2.longValue()));
        PCA pca = new PCA();
        pca.setK(2);
        pca.setInputCol("featurestore_feature_clustering_analysis_input_col");
        pca.setOutputCol("featurestore_feature_clustering_analysis_pca_col");
        PCAModel pcaModel = pca.fit(sparkDf4);
        cols[0] = functions.col((String)"featurestore_feature_clustering_analysis_pca_col");
        cols[1] = functions.col((String)"featurestore_feature_clustering_analysis_output_col");
        Dataset sparkDf5 = pcaModel.transform(sparkDf4).select(cols);
        Dataset sparkDf6 = sparkDf5.withColumnRenamed("featurestore_feature_clustering_analysis_pca_col", "features");
        Dataset sparkDf7 = sparkDf6.withColumnRenamed("featurestore_feature_clustering_analysis_output_col", "clusters");
        String[] jsonStrResult = (String[])sparkDf7.toJSON().collect();
        ClusterAnalysisDTO clusterAnalysisDTO = new ClusterAnalysisDTO();
        ArrayList<ClusterDTO> clusterDTOs = new ArrayList<ClusterDTO>();
        ArrayList<DatapointDTO> datapointDTOs = new ArrayList<DatapointDTO>();
        for (int i = 0; i < jsonStrResult.length; ++i) {
            JSONObject jsonObj = new JSONObject(jsonStrResult[i]);
            JSONObject featuresObj = jsonObj.getJSONObject("features");
            int cluster = jsonObj.getInt("clusters");
            JSONArray dimensions = featuresObj.getJSONArray("values");
            DatapointDTO datapointDTO = new DatapointDTO();
            datapointDTO.setDatapointName(Integer.toString(i));
            try {
                Float firstDimension = Float.valueOf((float)dimensions.getDouble(0));
                Float secondDimension = Float.valueOf((float)dimensions.getDouble(1));
                if (firstDimension.isInfinite() || firstDimension.isNaN()) {
                    firstDimension = Float.valueOf(0.0f);
                }
                if (secondDimension.isNaN() || secondDimension.isNaN()) {
                    secondDimension = Float.valueOf(0.0f);
                }
                datapointDTO.setFirstDimension(firstDimension);
                datapointDTO.setSecondDimension(secondDimension);
            }
            catch (ClassCastException e) {
                datapointDTO.setFirstDimension(Float.valueOf(0.0f));
                datapointDTO.setSecondDimension(Float.valueOf(0.0f));
            }
            ClusterDTO clusterDTO = new ClusterDTO();
            clusterDTO.setCluster(cluster);
            clusterDTO.setDatapointName(Integer.toString(i));
            datapointDTOs.add(datapointDTO);
            clusterDTOs.add(clusterDTO);
        }
        clusterAnalysisDTO.setClusters(clusterDTOs);
        clusterAnalysisDTO.setDataPoints(datapointDTOs);
        return clusterAnalysisDTO;
    }

    private static FeatureCorrelationMatrixDTO computeCorrMatrix(Dataset<Row> sparkDf, String correlationMethod) {
        int numberOfColumns = sparkDf.dtypes().length;
        if (numberOfColumns == 0) {
            throw new IllegalArgumentException("The provided spark dataframe does not contain any numeric columns.\nCannot compute feature correlation on categorical columns. \n The numeric datatypes are:" + StringUtils.join(Constants.NUMERIC_SPARK_TYPES, (String)",") + " \n The number of numeric datatypes in the provided dataframe is: " + numberOfColumns + "(" + Arrays.toString(sparkDf.dtypes()) + ")");
        }
        if (numberOfColumns == 1) {
            throw new IllegalArgumentException("The provided spark dataframe only contains one numeric column.\nCannot compute feature correlation on just one column. \n The numeric datatypes are:" + StringUtils.join(Constants.NUMERIC_SPARK_TYPES, (String)",") + " \n The number of numeric datatypes in the provided dataframe is: " + numberOfColumns + "(" + Arrays.toString(sparkDf.dtypes()) + ")");
        }
        if (numberOfColumns > 50) {
            throw new IllegalArgumentException("The provided spark dataframe have " + numberOfColumns + " columns, which exceeds the maximum number of columns: " + 50 + ". This is due to scalability reasons (number of correlatons grows quadratically with the number of columns");
        }
        Dataset<Row> sparkDf1 = FeaturestoreHelper.assembleColumnsIntoVector(sparkDf, "featurestore_feature_correlation_analysis_input_col");
        Row firstRow = (Row)Correlation.corr(sparkDf1, (String)"featurestore_feature_correlation_analysis_input_col", (String)correlationMethod).head();
        DenseMatrix correlationMatrix = (DenseMatrix)firstRow.get(0);
        FeatureCorrelationMatrixDTO featureCorrelationMatrixDTO = new FeatureCorrelationMatrixDTO();
        ArrayList<FeatureCorrelationDTO> featureCorrelationDTOList = new ArrayList<FeatureCorrelationDTO>();
        int noCorrelationMatrixColumns = correlationMatrix.numCols();
        int noCorrelationMatrixRows = correlationMatrix.numRows();
        StructField[] fields = sparkDf1.schema().fields();
        for (int i = 0; i < noCorrelationMatrixColumns; ++i) {
            String featureName = fields[i].name();
            FeatureCorrelationDTO featureCorrelationDTO = new FeatureCorrelationDTO();
            ArrayList<CorrelationValueDTO> correlationValueDTOList = new ArrayList<CorrelationValueDTO>();
            featureCorrelationDTO.setFeatureName(featureName);
            for (int j = 0; j < noCorrelationMatrixRows; ++j) {
                CorrelationValueDTO correlationValueDTO = new CorrelationValueDTO();
                String featureName2 = fields[j].name();
                correlationValueDTO.setFeatureName(featureName2);
                try {
                    Float corr = Float.valueOf((float)correlationMatrix.apply(i, j));
                    if (corr.isNaN() || corr.isInfinite()) {
                        corr = Float.valueOf(0.0f);
                    }
                    correlationValueDTO.setCorrelation(corr);
                }
                catch (ClassCastException e) {
                    correlationValueDTO.setCorrelation(Float.valueOf(0.0f));
                }
                correlationValueDTOList.add(correlationValueDTO);
            }
            featureCorrelationDTO.setCorrelationValues(correlationValueDTOList);
            featureCorrelationDTOList.add(featureCorrelationDTO);
        }
        featureCorrelationMatrixDTO.setFeatureCorrelations(featureCorrelationDTOList);
        return featureCorrelationMatrixDTO;
    }

    private static Dataset<Row> assembleColumnsIntoVector(Dataset<Row> sparkDf, String colName) {
        ArrayList numericColumns = new ArrayList();
        for (int i = 0; i < sparkDf.dtypes().length; ++i) {
            numericColumns.add(sparkDf.dtypes()[i]._1);
        }
        String[] numericColumnNamesArr = new String[numericColumns.size()];
        numericColumns.toArray(numericColumnNamesArr);
        VectorAssembler vectorAssembler = new VectorAssembler();
        vectorAssembler.setInputCols(numericColumnNamesArr);
        vectorAssembler.setOutputCol(colName);
        return vectorAssembler.transform(sparkDf);
    }

    private static FeatureDistributionsDTO computeFeatureHistograms(Dataset<Row> sparkDf, int numBins) throws SparkDataTypeNotRecognizedError {
        ArrayList<FeatureDistributionDTO> featureDistributionDTOS = new ArrayList<FeatureDistributionDTO>();
        for (int i = 0; i < sparkDf.schema().fields().length; ++i) {
            Tuple2 colHist = null;
            if (sparkDf.schema().fields()[i].dataType() instanceof IntegerType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getInt(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DecimalType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getDecimal(0).doubleValue()).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DoubleType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getDouble(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof FloatType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getFloat(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof LongType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getLong(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof ShortType) {
                colHist = sparkDf.select((String)sparkDf.dtypes()[i]._1, new String[0]).toJavaRDD().map((Function & Serializable)x -> x.getShort(0)).mapToDouble((DoubleFunction & Serializable)x -> x).histogram(numBins);
            }
            if (colHist == null) {
                throw new SparkDataTypeNotRecognizedError("Could not parse the spark datatypes to compute feature histograms");
            }
            double[] bins = (double[])colHist._1;
            long[] frequencies = (long[])colHist._2;
            FeatureDistributionDTO featureDistributionDTO = new FeatureDistributionDTO();
            featureDistributionDTO.setFeatureName((String)sparkDf.dtypes()[i]._1);
            ArrayList<HistogramBinDTO> histogramBinDTOList = new ArrayList<HistogramBinDTO>();
            for (int j = 0; j < frequencies.length; ++j) {
                HistogramBinDTO histogramBinDTO = new HistogramBinDTO();
                histogramBinDTO.setFrequency((int)frequencies[j]);
                double bin = bins[j + 1];
                histogramBinDTO.setBin(Double.toString(bin));
                histogramBinDTOList.add(histogramBinDTO);
            }
            featureDistributionDTO.setFrequencyDistribution(histogramBinDTOList);
            featureDistributionDTOS.add(featureDistributionDTO);
        }
        FeatureDistributionsDTO featureDistributionsDTO = new FeatureDistributionsDTO();
        featureDistributionsDTO.setFeatureDistributions(featureDistributionDTOS);
        return featureDistributionsDTO;
    }

    public static StatisticsDTO computeDataFrameStats(String name, SparkSession sparkSession, Dataset<Row> sparkDf, String featurestore, int version, Boolean descriptiveStatistics, Boolean featureCorrelation, Boolean featureHistograms, Boolean clusterAnalysis, List<String> statColumns, int numBins, int numClusters, String correlationMethod) throws DataframeIsEmpty, SparkDataTypeNotRecognizedError, FeaturestoreNotFound {
        Dataset<Row> numericSparkDf;
        if (sparkDf == null) {
            sparkDf = FeaturestoreHelper.getFeaturegroup(sparkSession, name, featurestore, version);
        }
        if (statColumns != null && !statColumns.isEmpty()) {
            List<Column> statSparkColumns = statColumns.stream().map(sc -> functions.col((String)sc)).collect(Collectors.toList());
            Column[] sparkColumnsArr = new Column[statSparkColumns.size()];
            statSparkColumns.toArray(sparkColumnsArr);
            sparkDf = sparkDf.select(sparkColumnsArr);
        }
        if (sparkDf.rdd().isEmpty()) {
            throw new DataframeIsEmpty("The provided dataframe is empty, cannot compute feature statistics on an empty dataframe");
        }
        ClusterAnalysisDTO clusterAnalysisDTO = null;
        DescriptiveStatsDTO descriptiveStatsDTO = null;
        FeatureCorrelationMatrixDTO featureCorrelationMatrixDTO = null;
        FeatureDistributionsDTO featureDistributionsDTO = null;
        if (descriptiveStatistics.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing descriptive statistics for: " + name);
                sparkSession.sparkContext().setJobGroup("Descriptive Statistics Computation", "Analyzing Dataframe Statistics for : " + name, true);
                descriptiveStatsDTO = FeaturestoreHelper.computeDescriptiveStatistics((Dataset<Row>)sparkDf);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute descriptive statistics for:" + name + "set the optional argument descriptive_statistics=False to skip this step. Error: " + e.getMessage());
            }
        }
        if (featureCorrelation.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing feature correlation for: " + name);
                sparkSession.sparkContext().setJobGroup("Feature Correlation Computation", "Analyzing Feature Correlations for: " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                featureCorrelationMatrixDTO = FeaturestoreHelper.computeCorrMatrix(numericSparkDf, correlationMethod);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute feature correlation for:" + name + "set the optional argument feature_correlation=False to skip this step. Error: " + e.getMessage());
            }
        }
        if (featureHistograms.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing feature histograms for: " + name);
                sparkSession.sparkContext().setJobGroup("Feature Histogram Computation", "Analyzing Feature Distributions for: " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                featureDistributionsDTO = FeaturestoreHelper.computeFeatureHistograms(numericSparkDf, numBins);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute feature histograms for:" + name + "set the optional argument feature_histograms=False to skip this step. Error: " + e.getMessage());
            }
        }
        if (clusterAnalysis.booleanValue()) {
            try {
                LOG.log(Level.INFO, "computing cluster analysis for: " + name);
                sparkSession.sparkContext().setJobGroup("Feature Cluster Analysis", "Analyzing Feature Clusters for: " + name, true);
                numericSparkDf = FeaturestoreHelper.filterSparkDfNumeric((Dataset<Row>)sparkDf);
                clusterAnalysisDTO = FeaturestoreHelper.computeClusterAnalysis(numericSparkDf, numClusters);
                sparkSession.sparkContext().setJobGroup("", "", true);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not compute cluster analysis for:" + name + "set the optional argument cluster_analysis=False to skip this step. Error: " + e.getMessage());
            }
        }
        return new StatisticsDTO(descriptiveStatsDTO, clusterAnalysisDTO, featureCorrelationMatrixDTO, featureDistributionsDTO);
    }

    public static void writeTrainingDatasetHdfs(SparkSession sparkSession, Dataset<Row> sparkDf, String hdfsPath, String dataFormat, String writeMode) throws TrainingDatasetFormatNotSupportedError, CannotWriteImageDataFrameException {
        sparkSession.sparkContext().setJobGroup("Materializing dataframe as training dataset", "Saving training dataset in path: " + hdfsPath + ", in format: " + dataFormat, true);
        switch (dataFormat) {
            case "csv": {
                sparkDf.write().option("delimiter", ",").mode(writeMode).option("header", "true").csv(hdfsPath);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "tsv": {
                sparkDf.write().option("delimiter", "\t").mode(writeMode).option("header", "true").csv(hdfsPath);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "parquet": {
                sparkDf.write().format(dataFormat).mode(writeMode).parquet(hdfsPath);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "avro": {
                sparkDf.write().format(dataFormat).mode(writeMode).save(hdfsPath);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "orc": {
                sparkDf.write().format(dataFormat).mode(writeMode).save(hdfsPath);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            case "image": {
                throw new CannotWriteImageDataFrameException("Image Dataframes can only be read, not written");
            }
            case "tfrecords": {
                sparkDf.write().format(dataFormat).option("recordType", "Example").mode(writeMode).save(hdfsPath);
                sparkSession.sparkContext().setJobGroup("", "", true);
                break;
            }
            default: {
                sparkSession.sparkContext().setJobGroup("", "", true);
                throw new TrainingDatasetFormatNotSupportedError("The provided data format: " + dataFormat + " is not supported in the Java/Scala API, the supported data formats are: " + "csv" + "," + "tsv" + "," + "tfrecords" + "," + "parquet" + "," + "avro" + "," + "orc" + "," + "image" + ",");
            }
        }
    }

    public static TrainingDatasetDTO findTrainingDataset(List<TrainingDatasetDTO> trainingDatasetDTOList, String trainingDatasetName, int trainingDatasetVersion) throws TrainingDatasetDoesNotExistError {
        List matches2 = trainingDatasetDTOList.stream().filter(td -> td.getName().equals(trainingDatasetName) && td.getVersion() == trainingDatasetVersion).collect(Collectors.toList());
        if (matches2.isEmpty()) {
            List trainingDatasetNames = trainingDatasetDTOList.stream().map(td -> td.getName()).collect(Collectors.toList());
            throw new TrainingDatasetDoesNotExistError("Could not find the requested training dataset with name: " + trainingDatasetName + " , and version: " + trainingDatasetVersion + " , among the list of available training datasets in the featurestore: " + StringUtils.join(trainingDatasetNames, (String)","));
        }
        return (TrainingDatasetDTO)matches2.get(0);
    }

    public static FeaturegroupDTO findFeaturegroup(List<FeaturegroupDTO> featuregroupDTOList, String featuregroupName, int featuregroupVersion) throws FeaturegroupDoesNotExistError {
        List matches2 = featuregroupDTOList.stream().filter(td -> td.getName().equals(featuregroupName) && td.getVersion() == featuregroupVersion).collect(Collectors.toList());
        if (matches2.isEmpty()) {
            List featuregroupNames = featuregroupDTOList.stream().map(td -> td.getName()).collect(Collectors.toList());
            throw new FeaturegroupDoesNotExistError("Could not find the requested feature grup with name: " + featuregroupName + " , and version: " + featuregroupVersion + " , among the list of available feature groups in the featurestore: " + StringUtils.join(featuregroupNames, (String)","));
        }
        return (FeaturegroupDTO)matches2.get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void writeTfRecordSchemaJson(String hdfsPath, String tfRecordSchemaJson) throws IOException {
        Configuration hdfsConf = new Configuration();
        Path filePath = new Path(hdfsPath);
        FSDataOutputStream outputStream = null;
        try {
            FileSystem hdfs = filePath.getFileSystem(hdfsConf);
            outputStream = hdfs.create(filePath, true);
            outputStream.writeBytes(tfRecordSchemaJson);
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Could not save tf record schema json to HDFS", e);
        }
        finally {
            if (outputStream != null) {
                outputStream.flush();
                outputStream.close();
            }
        }
    }

    public static JSONObject getDataframeTfRecordSchemaJson(Dataset<Row> sparkDf) throws InferTFRecordSchemaError {
        JSONObject tfRecordJsonSchema = new JSONObject();
        for (int i = 0; i < sparkDf.schema().fields().length; ++i) {
            JSONObject featureType;
            if (sparkDf.schema().fields()[i].dataType() instanceof IntegerType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "int");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof LongType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "int");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof FloatType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "float");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DoubleType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "float");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof DecimalType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "float");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof StringType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "string");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof BinaryType) {
                featureType = new JSONObject();
                featureType.put("feature", "fixed_len");
                featureType.put("type", "string");
                tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType);
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof ArrayType) {
                JSONObject featureType2;
                Row first = (Row)sparkDf.first();
                if (first.schema().fields().length > 1) {
                    throw new InferTFRecordSchemaError("Cannot Infer TF-Record Schema for spark dataframes with more than one nested levels");
                }
                if (first.schema().fields()[i].dataType() instanceof IntegerType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "int");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof LongType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "int");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof FloatType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "float");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof DoubleType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "float");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof DecimalType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "float");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof StringType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "string");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof BinaryType) {
                    featureType2 = new JSONObject();
                    featureType2.put("feature", "var_len");
                    featureType2.put("type", "string");
                    tfRecordJsonSchema.put(sparkDf.schema().fields()[i].name(), featureType2);
                }
                if (first.schema().fields()[i].dataType() instanceof ArrayType) {
                    throw new InferTFRecordSchemaError("Can only infer tf record schema for dataframes with one level of nested arrays, this dataframe has two levels.");
                }
                if (!(first.schema().fields()[i].dataType() instanceof IntegerType || first.schema().fields()[i].dataType() instanceof LongType || first.schema().fields()[i].dataType() instanceof FloatType || first.schema().fields()[i].dataType() instanceof DoubleType || first.schema().fields()[i].dataType() instanceof DecimalType || first.schema().fields()[i].dataType() instanceof StringType || first.schema().fields()[i].dataType() instanceof BinaryType)) {
                    throw new InferTFRecordSchemaError("Could not infer the tf record schema, an array column has the datatype: " + first.schema().fields()[i].dataType().toString() + " which is not in the list of recognized types:  IntegerType, LongType, FloatType, DoubleType, DecimalType, StringType, and BinaryType");
                }
            }
            if (sparkDf.schema().fields()[i].dataType() instanceof IntegerType || sparkDf.schema().fields()[i].dataType() instanceof LongType || sparkDf.schema().fields()[i].dataType() instanceof FloatType || sparkDf.schema().fields()[i].dataType() instanceof DoubleType || sparkDf.schema().fields()[i].dataType() instanceof DecimalType || sparkDf.schema().fields()[i].dataType() instanceof StringType || sparkDf.schema().fields()[i].dataType() instanceof BinaryType || sparkDf.schema().fields()[i].dataType() instanceof ArrayType) continue;
            throw new InferTFRecordSchemaError("Could not infer the tf record schema, a row has the datatype: " + sparkDf.schema().fields()[i].dataType().toString() + " which is not in the list of recognized types:  IntegerType, LongType, FloatType, DoubleType, DecimalType, StringType, BinaryType, and ArrayType");
        }
        LOG.log(Level.INFO, "Inferred TFRecordsSchema: " + tfRecordJsonSchema.toString());
        return tfRecordJsonSchema;
    }

    public static int getLatestFeaturegroupVersion(List<FeaturegroupDTO> featuregroupDTOS, String featuregroupName) {
        List matches2 = featuregroupDTOS.stream().filter(fg -> fg.getName().equals(featuregroupName)).collect(Collectors.toList());
        if (matches2.isEmpty()) {
            return 0;
        }
        return (Integer)Collections.max(matches2.stream().map(fg -> fg.getVersion()).collect(Collectors.toList()));
    }

    public static int getLatestTrainingDatasetVersion(List<TrainingDatasetDTO> trainingDatsetDTOS, String trainingDatasetName) {
        List matches2 = trainingDatsetDTOS.stream().filter(td -> td.getName().equals(trainingDatasetName)).collect(Collectors.toList());
        if (matches2.isEmpty()) {
            return 0;
        }
        return (Integer)Collections.max(matches2.stream().map(fg -> fg.getVersion()).collect(Collectors.toList()));
    }

    public static FeaturestoreMetadataDTO getFeaturestoreMetadataCache() {
        return featurestoreMetadataCache;
    }

    public static void setFeaturestoreMetadataCache(FeaturestoreMetadataDTO featurestoreMetadataCache) {
        FeaturestoreHelper.featurestoreMetadataCache = featurestoreMetadataCache;
    }

    public static String featurestoreGetOrDefault(String featurestore) {
        if (featurestore == null) {
            return FeaturestoreHelper.getProjectFeaturestore();
        }
        return featurestore;
    }

    public static SparkSession sparkGetOrDefault(SparkSession sparkSession) {
        if (sparkSession == null) {
            return Hops.findSpark();
        }
        return sparkSession;
    }

    public static String primaryKeyGetOrDefault(String primaryKey, Dataset<Row> df) {
        if (primaryKey == null) {
            return FeaturestoreHelper.getDefaultPrimaryKey(df);
        }
        return primaryKey;
    }

    public static String correlationMethodGetOrDefault(String corrMethod) {
        if (corrMethod == null) {
            return "pearson";
        }
        return corrMethod;
    }

    public static Integer numBinsGetOrDefault(Integer numBins) {
        if (numBins == null) {
            return 20;
        }
        return numBins;
    }

    public static Integer numClustersGetOrDefault(Integer numClusters) {
        if (numClusters == null) {
            return 5;
        }
        return numClusters;
    }

    public static String jobNameGetOrDefault(String jobName) {
        if (Strings.isNullOrEmpty(jobName)) {
            return Hops.getJobName();
        }
        return jobName;
    }

    public static String dataFormatGetOrDefault(String dataFormat) {
        if (dataFormat == null) {
            return "tfrecords";
        }
        return dataFormat;
    }

    public static Integer getFeaturestoreId(String featurestore) throws JAXBException, FeaturestoreNotFound {
        if (FeaturestoreHelper.getFeaturestoreMetadataCache() == null || !featurestore.equalsIgnoreCase(FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreName())) {
            new FeaturestoreUpdateMetadataCache().setFeaturestore(featurestore).write();
        }
        return FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreId();
    }

    public static Integer getFeaturegroupId(String featurestore, String featuregroup, int featuregroupVersion) throws JAXBException, FeaturestoreNotFound, FeaturegroupDoesNotExistError {
        List featuregroups;
        if (FeaturestoreHelper.getFeaturestoreMetadataCache() == null || !featurestore.equalsIgnoreCase(FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreName())) {
            new FeaturestoreUpdateMetadataCache().setFeaturestore(featurestore).write();
        }
        if ((featuregroups = FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturegroups().stream().filter(fg -> fg.getName().equalsIgnoreCase(featuregroup) && fg.getVersion() == featuregroupVersion).collect(Collectors.toList())).size() == 1) {
            return ((FeaturegroupDTO)featuregroups.get(0)).getId();
        }
        if (featuregroups.size() > 1) {
            throw new AssertionError((Object)(" Found more than one featuregroup with the name: " + featuregroup + " in the featurestore: " + featurestore));
        }
        throw new FeaturegroupDoesNotExistError("Featuregroup: " + featuregroup + " was not found in the featurestore: " + featurestore);
    }

    public static Integer getTrainingDatasetId(String featurestore, String trainingDataset, int trainingDatasetVersion) throws JAXBException, FeaturestoreNotFound, TrainingDatasetDoesNotExistError {
        List trainingDatasets;
        if (FeaturestoreHelper.getFeaturestoreMetadataCache() == null || !featurestore.equalsIgnoreCase(FeaturestoreHelper.getFeaturestoreMetadataCache().getFeaturestore().getFeaturestoreName())) {
            new FeaturestoreUpdateMetadataCache().setFeaturestore(featurestore).write();
        }
        if ((trainingDatasets = FeaturestoreHelper.getFeaturestoreMetadataCache().getTrainingDatasets().stream().filter(td -> td.getName().equalsIgnoreCase(trainingDataset) && td.getVersion() == trainingDatasetVersion).collect(Collectors.toList())).size() == 1) {
            return ((TrainingDatasetDTO)trainingDatasets.get(0)).getId();
        }
        if (trainingDatasets.size() > 1) {
            throw new AssertionError((Object)(" Found more than one training dataset with the name: " + trainingDataset + " in the featurestore: " + featurestore));
        }
        throw new TrainingDatasetDoesNotExistError("Training Dataset: " + trainingDataset + " was not found in the featurestore: " + featurestore);
    }

    static {
        featurestoreMetadataCache = null;
        try {
            descriptiveStatsJAXBContext = JAXBContextFactory.createContext(new Class[]{DescriptiveStatsDTO.class}, null);
            featureCorrelationJAXBContext = JAXBContextFactory.createContext(new Class[]{FeatureCorrelationMatrixDTO.class}, null);
            featureHistogramsJAXBContext = JAXBContextFactory.createContext(new Class[]{FeatureDistributionsDTO.class}, null);
            clusterAnalysisJAXBContext = JAXBContextFactory.createContext(new Class[]{ClusterAnalysisDTO.class}, null);
            featureJAXBContext = JAXBContextFactory.createContext(new Class[]{FeatureDTO.class}, null);
            featurestoreMetadataJAXBContext = JAXBContextFactory.createContext(new Class[]{FeaturestoreMetadataDTO.class}, null);
            trainingDatasetJAXBContext = JAXBContextFactory.createContext(new Class[]{TrainingDatasetDTO.class}, null);
            descriptiveStatsMarshaller = descriptiveStatsJAXBContext.createMarshaller();
            descriptiveStatsMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            descriptiveStatsMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featureCorrelationMarshaller = featureCorrelationJAXBContext.createMarshaller();
            featureCorrelationMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featureCorrelationMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            clusteranalysisMarshaller = clusterAnalysisJAXBContext.createMarshaller();
            clusteranalysisMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            clusteranalysisMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featureHistogramsMarshaller = featureHistogramsJAXBContext.createMarshaller();
            featureHistogramsMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featureHistogramsMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featureMarshaller = featureJAXBContext.createMarshaller();
            featureMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featureMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            featurestoreMetadataMarshaller = featurestoreMetadataJAXBContext.createMarshaller();
            featurestoreMetadataMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            featurestoreMetadataMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
            trainingDatasetMarshaller = trainingDatasetJAXBContext.createMarshaller();
            trainingDatasetMarshaller.setProperty("eclipselink.json.include-root", (Object)false);
            trainingDatasetMarshaller.setProperty("eclipselink.media-type", (Object)"application/json");
        }
        catch (JAXBException e) {
            LOG.log(Level.SEVERE, "An error occurred while initializing JAXBContext", e);
        }
    }
}

