/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.engine;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.JobConfiguration;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.engine.FeatureGroupEngineBase;
import com.logicalclocks.hsfs.spark.ExternalFeatureGroup;
import com.logicalclocks.hsfs.spark.FeatureGroup;
import com.logicalclocks.hsfs.spark.FeatureStore;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
import com.logicalclocks.hsfs.spark.engine.SparkEngine;
import com.logicalclocks.hsfs.spark.engine.hudi.HudiEngine;
import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class FeatureGroupEngine
extends FeatureGroupEngineBase {
    private HudiEngine hudiEngine = new HudiEngine();

    public FeatureGroup save(FeatureGroup featureGroup, Dataset<Row> dataset, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        this.insert(featureGroup, dataset, null, (HudiOperationType)((featureGroup = this.saveFeatureGroupMetaData(featureGroup, partitionKeys, hudiPrecombineKey, dataset = SparkEngine.getInstance().sanitizeFeatureNames(dataset), false)).getTimeTravelFormat() == TimeTravelFormat.HUDI ? HudiOperationType.BULK_INSERT : null), SaveMode.Append, partitionKeys, hudiPrecombineKey, writeOptions);
        return featureGroup;
    }

    public StreamFeatureGroup save(StreamFeatureGroup featureGroup, Dataset<Row> dataset, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration sparkJobConfiguration) throws FeatureStoreException, IOException, ParseException {
        StreamFeatureGroup updatedFeatureGroup = this.saveFeatureGroupMetaData(featureGroup, partitionKeys, hudiPrecombineKey, writeOptions, sparkJobConfiguration, dataset);
        this.insert(updatedFeatureGroup, SparkEngine.getInstance().sanitizeFeatureNames(dataset), SaveMode.Append, partitionKeys, hudiPrecombineKey, writeOptions, sparkJobConfiguration);
        return featureGroup;
    }

    public void insert(FeatureGroup featureGroup, Dataset<Row> featureData, Storage storage, HudiOperationType operation, SaveMode saveMode, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        Integer validationId = null;
        if (featureGroup.getId() == null) {
            featureGroup = this.saveFeatureGroupMetaData(featureGroup, partitionKeys, hudiPrecombineKey, featureData, false);
        }
        if (saveMode == SaveMode.Overwrite) {
            this.featureGroupApi.deleteContent((FeatureGroupBase)featureGroup);
        }
        this.saveDataframe(featureGroup, featureData, storage, operation, writeOptions, SparkEngine.getInstance().getKafkaConfig(featureGroup, writeOptions), validationId);
    }

    public void insert(StreamFeatureGroup streamFeatureGroup, Dataset<Row> featureData, SaveMode saveMode, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException {
        if (streamFeatureGroup.getId() == null) {
            streamFeatureGroup = this.saveFeatureGroupMetaData(streamFeatureGroup, partitionKeys, hudiPrecombineKey, writeOptions, jobConfiguration, featureData);
        }
        if (saveMode == SaveMode.Overwrite) {
            this.featureGroupApi.deleteContent((FeatureGroupBase)streamFeatureGroup);
        }
        SparkEngine.getInstance().writeOnlineDataframe(streamFeatureGroup, featureData, streamFeatureGroup.getOnlineTopicName(), SparkEngine.getInstance().getKafkaConfig(streamFeatureGroup, writeOptions));
    }

    public void insert(ExternalFeatureGroup externalFeatureGroup, Dataset<Row> featureData, Map<String, String> writeOptions) throws FeatureStoreException, IOException {
        if (!externalFeatureGroup.getOnlineEnabled().booleanValue()) {
            throw new FeatureStoreException("Online storage is not enabled for this feature group. External feature groups can only store data in online storage. To create an offline only external feature group, use the `save` method.");
        }
        if (externalFeatureGroup.getId() == null) {
            externalFeatureGroup = this.saveExternalFeatureGroup(externalFeatureGroup);
        }
        SparkEngine.getInstance().writeOnlineDataframe(externalFeatureGroup, featureData, externalFeatureGroup.getOnlineTopicName(), SparkEngine.getInstance().getKafkaConfig(externalFeatureGroup, writeOptions));
    }

    @Deprecated
    public StreamingQuery insertStream(FeatureGroup featureGroup, Dataset<Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions) throws FeatureStoreException, IOException, StreamingQueryException, TimeoutException, ParseException {
        if (!featureGroup.getOnlineEnabled().booleanValue()) {
            throw new FeatureStoreException("Online storage is not enabled for this feature group. It is currently only possible to stream to the online storage.");
        }
        if (featureGroup.getId() == null) {
            featureGroup = this.saveFeatureGroupMetaData(featureGroup, partitionKeys, hudiPrecombineKey, featureData, true);
        }
        StreamingQuery streamingQuery = SparkEngine.getInstance().writeStreamDataframe(featureGroup, SparkEngine.getInstance().convertToDefaultDataframe(featureData), queryName, outputMode, awaitTermination, timeout, checkpointLocation, SparkEngine.getInstance().getKafkaConfig(featureGroup, writeOptions));
        return streamingQuery;
    }

    public StreamingQuery insertStream(StreamFeatureGroup streamFeatureGroup, Dataset<Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration jobConfiguration) {
        if (writeOptions == null) {
            writeOptions = new HashMap<String, String>();
        }
        if (streamFeatureGroup.getId() == null) {
            streamFeatureGroup = this.saveFeatureGroupMetaData(streamFeatureGroup, partitionKeys, hudiPrecombineKey, writeOptions, jobConfiguration, featureData);
        }
        return SparkEngine.getInstance().writeStreamDataframe(streamFeatureGroup, SparkEngine.getInstance().sanitizeFeatureNames(featureData), queryName, outputMode, awaitTermination, timeout, checkpointLocation, SparkEngine.getInstance().getKafkaConfig(streamFeatureGroup, writeOptions));
    }

    public void saveDataframe(FeatureGroup featureGroup, Dataset<Row> dataset, Storage storage, HudiOperationType operation, Map<String, String> offlineWriteOptions, Map<String, String> onlineWriteOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException {
        if (!featureGroup.getOnlineEnabled().booleanValue() && storage == Storage.ONLINE) {
            throw new FeatureStoreException("Online storage is not enabled for this feature group. Set `online=false` to write to the offline storage.");
        }
        if (storage == Storage.OFFLINE || !featureGroup.getOnlineEnabled().booleanValue()) {
            SparkEngine.getInstance().writeOfflineDataframe(featureGroup, dataset, operation, offlineWriteOptions, validationId);
        } else if (storage == Storage.ONLINE) {
            SparkEngine.getInstance().writeOnlineDataframe(featureGroup, dataset, featureGroup.getOnlineTopicName(), onlineWriteOptions);
        } else if (featureGroup.getOnlineEnabled().booleanValue() && storage == null) {
            SparkEngine.getInstance().writeOfflineDataframe(featureGroup, dataset, operation, offlineWriteOptions, validationId);
            SparkEngine.getInstance().writeOnlineDataframe(featureGroup, dataset, featureGroup.getOnlineTopicName(), onlineWriteOptions);
        } else {
            throw new FeatureStoreException("Error writing to offline and online feature store.");
        }
    }

    public FeatureGroup saveFeatureGroupMetaData(FeatureGroup featureGroup, List<String> partitionKeys, String hudiPrecombineKey, Dataset<Row> featureData, boolean saveEmpty) throws FeatureStoreException, IOException, ParseException {
        if (featureGroup.getFeatures() == null) {
            featureGroup.setFeatures(SparkEngine.getInstance().parseFeatureGroupSchema(featureData, featureGroup.getTimeTravelFormat()));
        }
        FeatureGroupEngineBase.LOGGER.info("Featuregroup features: " + featureGroup.getFeatures());
        this.utils.verifyAttributeKeyNames((FeatureGroupBase)featureGroup, partitionKeys, hudiPrecombineKey);
        FeatureGroup apiFG = (FeatureGroup)this.featureGroupApi.saveFeatureGroupMetaData((FeatureGroupBase)featureGroup, partitionKeys, hudiPrecombineKey, null, null, FeatureGroup.class);
        featureGroup.setOnlineTopicName(apiFG.getOnlineTopicName());
        if (saveEmpty) {
            SparkEngine.getInstance().writeOfflineDataframe(featureGroup, SparkEngine.getInstance().createEmptyDataFrame(featureData), (HudiOperationType)(featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI ? HudiOperationType.BULK_INSERT : null), null, null);
        }
        return featureGroup;
    }

    public StreamFeatureGroup saveFeatureGroupMetaData(StreamFeatureGroup featureGroup, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration sparkJobConfiguration, Dataset<Row> featureData) throws FeatureStoreException, IOException {
        if (featureGroup.getFeatures() == null) {
            featureGroup.setFeatures(SparkEngine.getInstance().parseFeatureGroupSchema(SparkEngine.getInstance().sanitizeFeatureNames(featureData), featureGroup.getTimeTravelFormat()));
        }
        FeatureGroupEngineBase.LOGGER.info("Featuregroup features: " + featureGroup.getFeatures());
        this.utils.verifyAttributeKeyNames((FeatureGroupBase)featureGroup, partitionKeys, hudiPrecombineKey);
        StreamFeatureGroup apiFG = (StreamFeatureGroup)this.featureGroupApi.saveFeatureGroupMetaData((FeatureGroupBase)featureGroup, partitionKeys, hudiPrecombineKey, writeOptions, sparkJobConfiguration, StreamFeatureGroup.class);
        featureGroup.setOnlineTopicName(apiFG.getOnlineTopicName());
        return featureGroup;
    }

    public FeatureGroup getOrCreateFeatureGroup(FeatureStore featureStore, String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime) throws IOException, FeatureStoreException {
        FeatureGroup featureGroup;
        try {
            featureGroup = this.getFeatureGroup(featureStore, name, version);
        }
        catch (FeatureStoreException | IOException e) {
            if (e.getMessage().contains("Error: 404") && e.getMessage().contains("\"errorCode\":270009")) {
                featureGroup = FeatureGroup.builder().featureStore(featureStore).name(name).version(version).description(description).primaryKeys(primaryKeys).partitionKeys(partitionKeys).hudiPrecombineKey(hudiPrecombineKey).onlineEnabled(onlineEnabled).timeTravelFormat(timeTravelFormat).statisticsConfig(statisticsConfig).eventTime(eventTime).build();
                featureGroup.setFeatureStore(featureStore);
            }
            throw e;
        }
        return featureGroup;
    }

    public FeatureGroup getFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion) throws IOException, FeatureStoreException {
        FeatureGroup[] offlineFeatureGroups = (FeatureGroup[])this.featureGroupApi.getInternal((FeatureStoreBase)featureStore, fgName, fgVersion, FeatureGroup[].class);
        FeatureGroup resultFg = offlineFeatureGroups[0];
        resultFg.setFeatureStore(featureStore);
        return resultFg;
    }

    public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgName) throws FeatureStoreException, IOException {
        FeatureGroup[] offlineFeatureGroups = (FeatureGroup[])this.featureGroupApi.getInternal((FeatureStoreBase)featureStore, fgName, null, FeatureGroup[].class);
        return Arrays.asList(offlineFeatureGroups);
    }

    public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStore, String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, StatisticsConfig statisticsConfig, String eventTime) throws IOException, FeatureStoreException {
        StreamFeatureGroup featureGroup;
        try {
            featureGroup = this.getStreamFeatureGroup(featureStore, name, version);
        }
        catch (FeatureStoreException | IOException e) {
            if (e.getMessage().contains("Error: 404") && e.getMessage().contains("\"errorCode\":270009")) {
                featureGroup = StreamFeatureGroup.builder().featureStore(featureStore).name(name).version(version).description(description).primaryKeys(primaryKeys).partitionKeys(partitionKeys).hudiPrecombineKey(hudiPrecombineKey).onlineEnabled(onlineEnabled).statisticsConfig(statisticsConfig).eventTime(eventTime).build();
                featureGroup.setFeatureStore(featureStore);
            }
            throw e;
        }
        return featureGroup;
    }

    public StreamFeatureGroup getStreamFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion) throws IOException, FeatureStoreException {
        StreamFeatureGroup[] streamFeatureGroups = (StreamFeatureGroup[])this.featureGroupApi.getInternal((FeatureStoreBase)featureStore, fgName, fgVersion, StreamFeatureGroup[].class);
        StreamFeatureGroup resultFg = streamFeatureGroups[0];
        resultFg.setFeatureStore(featureStore);
        return resultFg;
    }

    public List<StreamFeatureGroup> getStreamFeatureGroups(FeatureStore featureStore, String fgName) throws FeatureStoreException, IOException {
        StreamFeatureGroup[] streamFeatureGroups = (StreamFeatureGroup[])this.featureGroupApi.getInternal((FeatureStoreBase)featureStore, fgName, null, StreamFeatureGroup[].class);
        return Arrays.asList(streamFeatureGroups);
    }

    public <T extends FeatureGroupBase> void appendFeatures(FeatureGroupBase featureGroup, List<Feature> features, Class<T> fgClass) throws FeatureStoreException, IOException, ParseException {
        featureGroup.getFeatures().addAll(features);
        FeatureGroupBase apiFG = this.featureGroupApi.updateMetadata(featureGroup, "updateMetadata", fgClass);
        featureGroup.setFeatures(apiFG.getFeatures());
        featureGroup.unloadSubject();
        if (featureGroup instanceof FeatureGroup) {
            SparkEngine.getInstance().writeEmptyDataframe(featureGroup);
        }
    }

    public Map<Long, Map<String, String>> commitDetails(FeatureGroupBase featureGroupBase, Integer limit) throws IOException, FeatureStoreException, ParseException {
        if (!(featureGroupBase instanceof FeatureGroup && featureGroupBase.getTimeTravelFormat() == TimeTravelFormat.HUDI || featureGroupBase instanceof StreamFeatureGroup)) {
            throw new FeatureStoreException("commitDetails function is only valid for time travel enabled feature group");
        }
        return this.utils.getCommitDetails(featureGroupBase, null, limit);
    }

    public Map<Long, Map<String, String>> commitDetailsByWallclockTime(FeatureGroupBase featureGroup, String wallclockTime, Integer limit) throws IOException, FeatureStoreException, ParseException {
        return this.utils.getCommitDetails(featureGroup, wallclockTime, limit);
    }

    public FeatureGroupCommit commitDelete(FeatureGroupBase featureGroupBase, Dataset<Row> genericDataset, Map<String, String> writeOptions) throws IOException, FeatureStoreException, ParseException {
        if (!(featureGroupBase instanceof FeatureGroup && featureGroupBase.getTimeTravelFormat() == TimeTravelFormat.HUDI || featureGroupBase instanceof StreamFeatureGroup)) {
            throw new FeatureStoreException("delete function is only valid for time travel enabled feature group");
        }
        return this.hudiEngine.deleteRecord(SparkEngine.getInstance().getSparkSession(), featureGroupBase, genericDataset, writeOptions);
    }

    public ExternalFeatureGroup saveExternalFeatureGroup(ExternalFeatureGroup externalFeatureGroup) throws FeatureStoreException, IOException {
        if (externalFeatureGroup.getFeatures() == null) {
            Dataset<Row> onDemandDataset = SparkEngine.getInstance().registerOnDemandTemporaryTable(externalFeatureGroup, "read_ondmd");
            externalFeatureGroup.setFeatures(SparkEngine.getInstance().parseFeatureGroupSchema(onDemandDataset, externalFeatureGroup.getTimeTravelFormat()));
        }
        this.utils.verifyAttributeKeyNames((FeatureGroupBase)externalFeatureGroup, null, null);
        if (externalFeatureGroup.getPrimaryKeys() != null) {
            externalFeatureGroup.getPrimaryKeys().forEach(pk -> externalFeatureGroup.getFeatures().forEach(f -> {
                if (f.getName().equals(pk)) {
                    f.setPrimary(Boolean.valueOf(true));
                }
            }));
        }
        ExternalFeatureGroup apiFg = (ExternalFeatureGroup)this.saveExtennalFeatureGroupMetaData(externalFeatureGroup, ExternalFeatureGroup.class);
        externalFeatureGroup.setId(apiFg.getId());
        externalFeatureGroup.setOnlineTopicName(apiFg.getOnlineTopicName());
        return externalFeatureGroup;
    }

    public List<ExternalFeatureGroup> getExternalFeatureGroups(FeatureStore featureStore, String fgName) throws FeatureStoreException, IOException {
        ExternalFeatureGroup[] offlineFeatureGroups = (ExternalFeatureGroup[])this.featureGroupApi.getInternal((FeatureStoreBase)featureStore, fgName, null, ExternalFeatureGroup[].class);
        return Arrays.asList(offlineFeatureGroups);
    }

    public ExternalFeatureGroup getExternalFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion) throws IOException, FeatureStoreException {
        ExternalFeatureGroup[] offlineFeatureGroups = (ExternalFeatureGroup[])this.featureGroupApi.getInternal((FeatureStoreBase)featureStore, fgName, fgVersion, ExternalFeatureGroup[].class);
        ExternalFeatureGroup resultFg = offlineFeatureGroups[0];
        resultFg.setFeatureStore(featureStore);
        return resultFg;
    }
}

