/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.DeltaStreamerJobConf;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.JobConfiguration;
import com.logicalclocks.hsfs.SaveMode;
import com.logicalclocks.hsfs.StreamFeatureGroup;
import com.logicalclocks.hsfs.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.engine.SparkEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.KafkaApi;
import com.logicalclocks.hsfs.metadata.Option;
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamFeatureGroupEngine {
    private KafkaApi kafkaApi = new KafkaApi();
    private FeatureGroupApi featureGroupApi = new FeatureGroupApi();
    private FeatureGroupUtils utils = new FeatureGroupUtils();
    private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupEngine.class);

    public <S> StreamFeatureGroup save(StreamFeatureGroup featureGroup, S dataset, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration sparkJobConfiguration) throws FeatureStoreException, IOException, ParseException {
        StreamFeatureGroup updatedFeatureGroup = this.saveFeatureGroupMetaData(featureGroup, partitionKeys, hudiPrecombineKey, writeOptions, sparkJobConfiguration, dataset);
        this.insert(updatedFeatureGroup, this.utils.sanitizeFeatureNames(dataset), SaveMode.APPEND, partitionKeys, hudiPrecombineKey, writeOptions, sparkJobConfiguration);
        return featureGroup;
    }

    public <S> Object insertStream(StreamFeatureGroup streamFeatureGroup, S featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration jobConfiguration) {
        if (writeOptions == null) {
            writeOptions = new HashMap<String, String>();
        }
        if (streamFeatureGroup.getId() == null) {
            streamFeatureGroup = this.saveFeatureGroupMetaData(streamFeatureGroup, partitionKeys, hudiPrecombineKey, writeOptions, jobConfiguration, featureData);
        }
        return SparkEngine.getInstance().writeStreamDataframe(streamFeatureGroup, this.utils.sanitizeFeatureNames(featureData), queryName, outputMode, awaitTermination, timeout, checkpointLocation, this.utils.getKafkaConfig(streamFeatureGroup, writeOptions));
    }

    public <S> void insert(StreamFeatureGroup streamFeatureGroup, S featureData, SaveMode saveMode, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException {
        if (streamFeatureGroup.getId() == null) {
            streamFeatureGroup = this.saveFeatureGroupMetaData(streamFeatureGroup, partitionKeys, hudiPrecombineKey, writeOptions, jobConfiguration, featureData);
        }
        if (saveMode == SaveMode.OVERWRITE) {
            this.featureGroupApi.deleteContent(streamFeatureGroup);
        }
        SparkEngine.getInstance().writeOnlineDataframe(streamFeatureGroup, featureData, streamFeatureGroup.getOnlineTopicName(), this.utils.getKafkaConfig(streamFeatureGroup, writeOptions));
    }

    public <S> StreamFeatureGroup saveFeatureGroupMetaData(StreamFeatureGroup featureGroup, List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions, JobConfiguration sparkJobConfiguration, S featureData) throws FeatureStoreException, IOException, ParseException {
        if (featureGroup.getFeatures() == null) {
            featureGroup.setFeatures(this.utils.parseFeatureGroupSchema(this.utils.sanitizeFeatureNames(featureData), featureGroup.getTimeTravelFormat()));
        }
        LOGGER.info("Featuregroup features: " + featureGroup.getFeatures());
        if (featureGroup.getPrimaryKeys() != null) {
            featureGroup.getPrimaryKeys().forEach(pk -> featureGroup.getFeatures().forEach(f -> {
                if (f.getName().equals(pk)) {
                    f.setPrimary(true);
                }
            }));
        }
        if (partitionKeys != null) {
            partitionKeys.forEach(pk -> featureGroup.getFeatures().forEach(f -> {
                if (f.getName().equals(pk)) {
                    f.setPartition(true);
                }
            }));
        }
        if (hudiPrecombineKey != null) {
            featureGroup.getFeatures().forEach(f -> {
                if (f.getName().equals(hudiPrecombineKey)) {
                    f.setHudiPrecombineKey(true);
                }
            });
        }
        DeltaStreamerJobConf deltaStreamerJobConf = new DeltaStreamerJobConf();
        deltaStreamerJobConf.setWriteOptions(writeOptions != null ? writeOptions.entrySet().stream().map(e -> new Option((String)e.getKey(), (String)e.getValue())).collect(Collectors.toList()) : null);
        deltaStreamerJobConf.setSparkJobConfiguration(sparkJobConfiguration);
        featureGroup.setDeltaStreamerJobConf(deltaStreamerJobConf);
        StreamFeatureGroup apiFG = this.featureGroupApi.save(featureGroup);
        if (featureGroup.getVersion() == null) {
            LOGGER.info("VersionWarning: No version provided for creating feature group `" + featureGroup.getName() + "`, incremented version to `" + apiFG.getVersion() + "`.");
        }
        featureGroup.setId(apiFG.getId());
        featureGroup.setVersion(apiFG.getVersion());
        featureGroup.setLocation(apiFG.getLocation());
        featureGroup.setStatisticsConfig(apiFG.getStatisticsConfig());
        featureGroup.setOnlineTopicName(apiFG.getOnlineTopicName());
        if (hudiPrecombineKey == null) {
            List<Feature> features = apiFG.getFeatures();
            featureGroup.setFeatures(features);
        }
        return featureGroup;
    }
}

