/*
 * Decompiled with CFR 0.152.
 */
package io.hops.util.featurestore.ops.write_ops;

import io.hops.util.FeaturestoreRestClient;
import io.hops.util.Hops;
import io.hops.util.exceptions.CannotWriteImageDataFrameException;
import io.hops.util.exceptions.DataframeIsEmpty;
import io.hops.util.exceptions.FeaturegroupUpdateStatsError;
import io.hops.util.exceptions.FeaturestoreNotFound;
import io.hops.util.exceptions.JWTNotFoundException;
import io.hops.util.exceptions.SparkDataTypeNotRecognizedError;
import io.hops.util.exceptions.TrainingDatasetDoesNotExistError;
import io.hops.util.exceptions.TrainingDatasetFormatNotSupportedError;
import io.hops.util.featurestore.FeaturestoreHelper;
import io.hops.util.featurestore.dtos.FeaturestoreMetadataDTO;
import io.hops.util.featurestore.dtos.TrainingDatasetDTO;
import io.hops.util.featurestore.dtos.stats.StatisticsDTO;
import io.hops.util.featurestore.ops.FeaturestoreOp;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.json.JSONObject;

public class FeaturestoreInsertIntoTrainingDataset
extends FeaturestoreOp {
    private static final Logger LOG = Logger.getLogger(FeaturestoreInsertIntoTrainingDataset.class.getName());

    public FeaturestoreInsertIntoTrainingDataset(String name) {
        super(name);
    }

    @Override
    public Object read() {
        throw new UnsupportedOperationException("read() is not supported on a write operation");
    }

    @Override
    public void write() throws DataframeIsEmpty, SparkDataTypeNotRecognizedError, JAXBException, FeaturegroupUpdateStatsError, FeaturestoreNotFound, TrainingDatasetDoesNotExistError, TrainingDatasetFormatNotSupportedError, CannotWriteImageDataFrameException, JWTNotFoundException {
        if (this.dataframe == null) {
            throw new IllegalArgumentException("Dataframe to insert cannot be null, specify dataframe with .setDataframe(df)");
        }
        if (this.mode == null || !this.mode.equalsIgnoreCase("overwrite")) {
            throw new IllegalArgumentException("The supplied write mode: " + this.mode + " does not match any of the supported modes: overwrite (training datasets are immutable)");
        }
        try {
            FeaturestoreInsertIntoTrainingDataset.doInsertIntoTrainingDataset(this.spark, (Dataset<Row>)this.dataframe, this.name, this.featurestore, Hops.getFeaturestoreMetadata().setFeaturestore(this.featurestore).read(), this.version, this.descriptiveStats, this.featureCorr, this.featureHistograms, this.clusterAnalysis, this.statColumns, this.numBins, this.corrMethod, this.numClusters, this.mode);
        }
        catch (Exception e) {
            Hops.updateFeaturestoreMetadataCache().setFeaturestore(this.featurestore).write();
            FeaturestoreInsertIntoTrainingDataset.doInsertIntoTrainingDataset(this.spark, (Dataset<Row>)this.dataframe, this.name, this.featurestore, Hops.getFeaturestoreMetadata().setFeaturestore(this.featurestore).read(), this.version, this.descriptiveStats, this.featureCorr, this.featureHistograms, this.clusterAnalysis, this.statColumns, this.numBins, this.corrMethod, this.numClusters, this.mode);
        }
    }

    private static void doInsertIntoTrainingDataset(SparkSession sparkSession, Dataset<Row> sparkDf, String trainingDataset, String featurestore, FeaturestoreMetadataDTO featurestoreMetadata, int trainingDatasetVersion, Boolean descriptiveStats, Boolean featureCorr, Boolean featureHistograms, Boolean clusterAnalysis, List<String> statColumns, Integer numBins, String corrMethod, Integer numClusters, String writeMode) throws JAXBException, TrainingDatasetDoesNotExistError, DataframeIsEmpty, FeaturegroupUpdateStatsError, TrainingDatasetFormatNotSupportedError, SparkDataTypeNotRecognizedError, FeaturestoreNotFound, CannotWriteImageDataFrameException, JWTNotFoundException {
        featurestore = FeaturestoreHelper.featurestoreGetOrDefault(featurestore);
        sparkSession = FeaturestoreHelper.sparkGetOrDefault(sparkSession);
        corrMethod = FeaturestoreHelper.correlationMethodGetOrDefault(corrMethod);
        numBins = FeaturestoreHelper.numBinsGetOrDefault(numBins);
        numClusters = FeaturestoreHelper.numClustersGetOrDefault(numClusters);
        List<TrainingDatasetDTO> trainingDatasetDTOList = featurestoreMetadata.getTrainingDatasets();
        FeaturestoreHelper.findTrainingDataset(trainingDatasetDTOList, trainingDataset, trainingDatasetVersion);
        StatisticsDTO statisticsDTO = FeaturestoreHelper.computeDataFrameStats(trainingDataset, sparkSession, sparkDf, featurestore, trainingDatasetVersion, descriptiveStats, featureCorr, featureHistograms, clusterAnalysis, statColumns, numBins, numClusters, corrMethod);
        Response response = FeaturestoreRestClient.updateTrainingDatasetStatsRest(trainingDataset, featurestore, trainingDatasetVersion, statisticsDTO);
        String jsonStrResponse = response.readEntity(String.class);
        JSONObject jsonObjResponse = new JSONObject(jsonStrResponse);
        TrainingDatasetDTO updatedTrainingDatasetDTO = FeaturestoreHelper.parseTrainingDatasetJson(jsonObjResponse);
        String hdfsPath = updatedTrainingDatasetDTO.getHdfsStorePath() + "/" + trainingDataset;
        FeaturestoreHelper.writeTrainingDatasetHdfs(sparkSession, sparkDf, hdfsPath, updatedTrainingDatasetDTO.getDataFormat(), writeMode);
        if (updatedTrainingDatasetDTO.getDataFormat() == "tfrecords") {
            JSONObject tfRecordSchemaJson = null;
            try {
                tfRecordSchemaJson = FeaturestoreHelper.getDataframeTfRecordSchemaJson(sparkDf);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Could not infer the TF-record schema for the training dataset");
            }
            if (tfRecordSchemaJson != null) {
                try {
                    FeaturestoreHelper.writeTfRecordSchemaJson(updatedTrainingDatasetDTO.getHdfsStorePath() + "/" + "tf_record_schema.txt", tfRecordSchemaJson.toString());
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "Could not save tf record schema json to HDFS for training dataset: " + trainingDataset, e);
                }
            }
        }
    }

    public FeaturestoreInsertIntoTrainingDataset setName(String name) {
        this.name = name;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setFeaturestore(String featurestore) {
        this.featurestore = featurestore;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setSpark(SparkSession spark) {
        this.spark = spark;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setVersion(int version) {
        this.version = version;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setCorrMethod(String corrMethod) {
        this.corrMethod = corrMethod;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setNumBins(int numBins) {
        this.numBins = numBins;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setNumClusters(int numClusters) {
        this.numClusters = numClusters;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setMode(String mode) {
        this.mode = mode;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setDataframe(Dataset<Row> dataframe) {
        this.dataframe = dataframe;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setDescriptiveStats(Boolean descriptiveStats) {
        this.descriptiveStats = descriptiveStats;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setFeatureCorr(Boolean featureCorr) {
        this.featureCorr = featureCorr;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setFeatureHistograms(Boolean featureHistograms) {
        this.featureHistograms = featureHistograms;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setClusterAnalysis(Boolean clusterAnalysis) {
        this.clusterAnalysis = clusterAnalysis;
        return this;
    }

    public FeaturestoreInsertIntoTrainingDataset setStatColumns(List<String> statColumns) {
        this.statColumns = statColumns;
        return this;
    }
}

