package io.hops.util.featurestore.ops.write_ops;

import io.hops.util.Constants;
import io.hops.util.FeaturestoreRestClient;
import io.hops.util.Hops;
import io.hops.util.exceptions.CannotWriteImageDataFrameException;
import io.hops.util.exceptions.DataframeIsEmpty;
import io.hops.util.exceptions.FeaturegroupUpdateStatsError;
import io.hops.util.exceptions.FeaturestoreNotFound;
import io.hops.util.exceptions.HiveNotEnabled;
import io.hops.util.exceptions.JWTNotFoundException;
import io.hops.util.exceptions.OnlineFeaturestorePasswordNotFound;
import io.hops.util.exceptions.OnlineFeaturestoreUserNotFound;
import io.hops.util.exceptions.SparkDataTypeNotRecognizedError;
import io.hops.util.exceptions.StorageConnectorDoesNotExistError;
import io.hops.util.exceptions.TrainingDatasetDoesNotExistError;
import io.hops.util.exceptions.TrainingDatasetFormatNotSupportedError;
import io.hops.util.featurestore.FeaturestoreHelper;
import io.hops.util.featurestore.dtos.app.FeaturestoreMetadataDTO;
import io.hops.util.featurestore.dtos.jobs.FeaturestoreJobDTO;
import io.hops.util.featurestore.dtos.stats.StatisticsDTO;
import io.hops.util.featurestore.dtos.storageconnector.FeaturestoreS3ConnectorDTO;
import io.hops.util.featurestore.dtos.trainingdataset.ExternalTrainingDatasetDTO;
import io.hops.util.featurestore.dtos.trainingdataset.HopsfsTrainingDatasetDTO;
import io.hops.util.featurestore.dtos.trainingdataset.TrainingDatasetDTO;
import io.hops.util.featurestore.dtos.trainingdataset.TrainingDatasetType;
import io.hops.util.featurestore.ops.FeaturestoreOp;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.json.JSONObject;

/* loaded from: input_file:io/hops/util/featurestore/ops/write_ops/FeaturestoreInsertIntoTrainingDataset.class */
public class FeaturestoreInsertIntoTrainingDataset extends FeaturestoreOp {
    private static final Logger LOG = Logger.getLogger(FeaturestoreInsertIntoTrainingDataset.class.getName());

    public FeaturestoreInsertIntoTrainingDataset(String str) {
        super(str);
    }

    @Override // io.hops.util.featurestore.ops.FeaturestoreOp
    public Object read() {
        throw new UnsupportedOperationException("read() is not supported on a write operation");
    }

    @Override // io.hops.util.featurestore.ops.FeaturestoreOp
    public void write() throws DataframeIsEmpty, SparkDataTypeNotRecognizedError, JAXBException, FeaturegroupUpdateStatsError, FeaturestoreNotFound, TrainingDatasetDoesNotExistError, TrainingDatasetFormatNotSupportedError, CannotWriteImageDataFrameException, JWTNotFoundException, HiveNotEnabled, StorageConnectorDoesNotExistError, OnlineFeaturestoreUserNotFound, OnlineFeaturestorePasswordNotFound {
        if (this.dataframe == null) {
            throw new IllegalArgumentException("Dataframe to insert cannot be null, specify dataframe with .setDataframe(df)");
        }
        if (this.mode == null || !this.mode.equalsIgnoreCase(Constants.SPARK_OVERWRITE_MODE)) {
            throw new IllegalArgumentException("The supplied write mode: " + this.mode + " does not match any of the supported modes: overwrite (training datasets are immutable)");
        }
        try {
            doInsertIntoTrainingDataset(getSpark(), this.dataframe, this.name, this.featurestore, Hops.getFeaturestoreMetadata().setFeaturestore(this.featurestore).read(), this.version, this.descriptiveStats, this.featureCorr, this.featureHistograms, this.clusterAnalysis, this.statColumns, Integer.valueOf(this.numBins), this.corrMethod, Integer.valueOf(this.numClusters), this.mode);
        } catch (Exception e) {
            Hops.updateFeaturestoreMetadataCache().setFeaturestore(this.featurestore).write();
            doInsertIntoTrainingDataset(getSpark(), this.dataframe, this.name, this.featurestore, Hops.getFeaturestoreMetadata().setFeaturestore(this.featurestore).read(), this.version, this.descriptiveStats, this.featureCorr, this.featureHistograms, this.clusterAnalysis, this.statColumns, Integer.valueOf(this.numBins), this.corrMethod, Integer.valueOf(this.numClusters), this.mode);
        }
    }

    private void doInsertIntoTrainingDataset(SparkSession sparkSession, Dataset<Row> dataset, String str, String str2, FeaturestoreMetadataDTO featurestoreMetadataDTO, int i, Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4, List<String> list, Integer num, String str3, Integer num2, String str4) throws JAXBException, TrainingDatasetDoesNotExistError, DataframeIsEmpty, FeaturegroupUpdateStatsError, TrainingDatasetFormatNotSupportedError, FeaturestoreNotFound, CannotWriteImageDataFrameException, JWTNotFoundException, StorageConnectorDoesNotExistError, HiveNotEnabled, OnlineFeaturestoreUserNotFound, OnlineFeaturestorePasswordNotFound {
        String featurestoreGetOrDefault = FeaturestoreHelper.featurestoreGetOrDefault(str2);
        SparkSession sparkGetOrDefault = FeaturestoreHelper.sparkGetOrDefault(sparkSession);
        String correlationMethodGetOrDefault = FeaturestoreHelper.correlationMethodGetOrDefault(str3);
        Integer numBinsGetOrDefault = FeaturestoreHelper.numBinsGetOrDefault(num);
        Integer numClustersGetOrDefault = FeaturestoreHelper.numClustersGetOrDefault(num2);
        TrainingDatasetDTO findTrainingDataset = FeaturestoreHelper.findTrainingDataset(featurestoreMetadataDTO.getTrainingDatasets(), str, i);
        TrainingDatasetDTO parseTrainingDatasetJson = FeaturestoreHelper.parseTrainingDatasetJson(new JSONObject((String) FeaturestoreRestClient.updateTrainingDatasetStatsRest(groupInputParamsIntoDTO(findTrainingDataset, FeaturestoreHelper.computeDataFrameStats(str, sparkGetOrDefault, dataset, featurestoreGetOrDefault, i, bool, bool2, bool3, bool4, list, numBinsGetOrDefault.intValue(), numClustersGetOrDefault.intValue(), correlationMethodGetOrDefault)), FeaturestoreHelper.getTrainingDatasetDTOTypeStr(findTrainingDataset, featurestoreMetadataDTO.getSettings())).readEntity(String.class)));
        if (parseTrainingDatasetJson.getTrainingDatasetType() == TrainingDatasetType.HOPSFS_TRAINING_DATASET) {
            insertIntoHopsfsTrainingDataset(parseTrainingDatasetJson, sparkGetOrDefault, dataset, str4);
        } else {
            insertIntoExternalTrainingDataset(parseTrainingDatasetJson, featurestoreMetadataDTO, sparkGetOrDefault, dataset, str4);
        }
    }

    private void insertIntoHopsfsTrainingDataset(TrainingDatasetDTO trainingDatasetDTO, SparkSession sparkSession, Dataset<Row> dataset, String str) throws TrainingDatasetFormatNotSupportedError, CannotWriteImageDataFrameException {
        HopsfsTrainingDatasetDTO hopsfsTrainingDatasetDTO = (HopsfsTrainingDatasetDTO) trainingDatasetDTO;
        FeaturestoreHelper.writeTrainingDataset(sparkSession, dataset, hopsfsTrainingDatasetDTO.getHdfsStorePath() + Constants.SLASH_DELIMITER + trainingDatasetDTO.getName(), hopsfsTrainingDatasetDTO.getDataFormat(), str);
        if (hopsfsTrainingDatasetDTO.getDataFormat() == Constants.TRAINING_DATASET_TFRECORDS_FORMAT) {
            JSONObject jSONObject = null;
            try {
                jSONObject = FeaturestoreHelper.getDataframeTfRecordSchemaJson(dataset);
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Could not infer the TF-record schema for the training dataset");
            }
            if (jSONObject != null) {
                try {
                    FeaturestoreHelper.writeTfRecordSchemaJson(hopsfsTrainingDatasetDTO.getHdfsStorePath() + Constants.SLASH_DELIMITER + Constants.TRAINING_DATASET_TF_RECORD_SCHEMA_FILE_NAME, jSONObject.toString());
                } catch (Exception e2) {
                    LOG.log(Level.WARNING, "Could not save tf record schema json to HDFS for training dataset: " + trainingDatasetDTO.getName(), (Throwable) e2);
                }
            }
        }
    }

    private void insertIntoExternalTrainingDataset(TrainingDatasetDTO trainingDatasetDTO, FeaturestoreMetadataDTO featurestoreMetadataDTO, SparkSession sparkSession, Dataset<Row> dataset, String str) throws StorageConnectorDoesNotExistError, TrainingDatasetFormatNotSupportedError, CannotWriteImageDataFrameException, HiveNotEnabled {
        ExternalTrainingDatasetDTO externalTrainingDatasetDTO = (ExternalTrainingDatasetDTO) trainingDatasetDTO;
        FeaturestoreS3ConnectorDTO featurestoreS3ConnectorDTO = (FeaturestoreS3ConnectorDTO) FeaturestoreHelper.findStorageConnector(featurestoreMetadataDTO.getStorageConnectors(), externalTrainingDatasetDTO.getS3ConnectorName());
        String externalTrainingDatasetPath = FeaturestoreHelper.getExternalTrainingDatasetPath(externalTrainingDatasetDTO.getName(), externalTrainingDatasetDTO.getVersion().intValue(), featurestoreS3ConnectorDTO.getBucket());
        FeaturestoreHelper.setupS3CredentialsForSpark(featurestoreS3ConnectorDTO.getAccessKey(), featurestoreS3ConnectorDTO.getSecretKey(), getSpark());
        FeaturestoreHelper.writeTrainingDataset(sparkSession, dataset, externalTrainingDatasetPath, externalTrainingDatasetDTO.getDataFormat(), str);
    }

    private TrainingDatasetDTO groupInputParamsIntoDTO(TrainingDatasetDTO trainingDatasetDTO, StatisticsDTO statisticsDTO) {
        if (FeaturestoreHelper.jobNameGetOrDefault(null) != null) {
            this.jobs.add(FeaturestoreHelper.jobNameGetOrDefault(null));
        }
        List<FeaturestoreJobDTO> list = (List) this.jobs.stream().map(str -> {
            FeaturestoreJobDTO featurestoreJobDTO = new FeaturestoreJobDTO();
            featurestoreJobDTO.setJobName(str);
            return featurestoreJobDTO;
        }).collect(Collectors.toList());
        trainingDatasetDTO.setClusterAnalysis(statisticsDTO.getClusterAnalysisDTO());
        trainingDatasetDTO.setFeaturesHistogram(statisticsDTO.getFeatureDistributionsDTO());
        trainingDatasetDTO.setDescriptiveStatistics(statisticsDTO.getDescriptiveStatsDTO());
        trainingDatasetDTO.setFeatureCorrelationMatrix(statisticsDTO.getFeatureCorrelationMatrixDTO());
        trainingDatasetDTO.setJobs(list);
        return trainingDatasetDTO;
    }

    public FeaturestoreInsertIntoTrainingDataset setName(String str) {
        this.name = str;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setFeaturestore(String str) {
        this.featurestore = str;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setSpark(SparkSession sparkSession) {
        this.spark = sparkSession;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setVersion(int i) {
        this.version = i;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setCorrMethod(String str) {
        this.corrMethod = str;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setNumBins(int i) {
        this.numBins = i;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setNumClusters(int i) {
        this.numClusters = i;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setMode(String str) {
        this.mode = str;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setDataframe(Dataset<Row> dataset) {
        this.dataframe = dataset;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setDescriptiveStats(Boolean bool) {
        this.descriptiveStats = bool;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setFeatureCorr(Boolean bool) {
        this.featureCorr = bool;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setFeatureHistograms(Boolean bool) {
        this.featureHistograms = bool;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setClusterAnalysis(Boolean bool) {
        this.clusterAnalysis = bool;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setStatColumns(List<String> list) {
        this.statColumns = list;
        return this;
    }
}
