/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.KafkaApi;
import com.logicalclocks.hsfs.metadata.Subject;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaParseException;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class FeatureGroupUtils {
    private FeatureGroupApi featureGroupApi = new FeatureGroupApi();
    private KafkaApi kafkaApi = new KafkaApi();
    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");

    public String getTableName(FeatureGroupBase offlineFeatureGroup) {
        return offlineFeatureGroup.getFeatureStore().getName() + "." + offlineFeatureGroup.getName() + "_" + offlineFeatureGroup.getVersion();
    }

    public String getOnlineTableName(FeatureGroupBase offlineFeatureGroup) {
        return offlineFeatureGroup.getName() + "_" + offlineFeatureGroup.getVersion();
    }

    public Seq<String> getPartitionColumns(FeatureGroupBase offlineFeatureGroup) {
        List<Feature> features = offlineFeatureGroup.getFeatures();
        List partitionCols = features.stream().filter(Feature::getPartition).map(Feature::getName).collect(Collectors.toList());
        return ((Iterator)JavaConverters.asScalaIteratorConverter(partitionCols.iterator()).asScala()).toSeq();
    }

    public Seq<String> getPrimaryColumns(FeatureGroupBase offlineFeatureGroup) {
        List<Feature> features = offlineFeatureGroup.getFeatures();
        List primaryCols = features.stream().filter(Feature::getPrimary).map(Feature::getName).collect(Collectors.toList());
        return ((Iterator)JavaConverters.asScalaIteratorConverter(primaryCols.iterator()).asScala()).toSeq();
    }

    public String getFgName(FeatureGroupBase featureGroup) {
        return featureGroup.getName() + "_" + featureGroup.getVersion();
    }

    public String getHiveServerConnection(FeatureGroupBase featureGroup, String connectionString) throws IOException, FeatureStoreException {
        HashMap<String, String> credentials = new HashMap<String, String>();
        credentials.put("sslTrustStore", HopsworksClient.getInstance().getHopsworksHttpClient().getTrustStorePath());
        credentials.put("trustStorePassword", HopsworksClient.getInstance().getHopsworksHttpClient().getCertKey());
        credentials.put("sslKeyStore", HopsworksClient.getInstance().getHopsworksHttpClient().getKeyStorePath());
        credentials.put("keyStorePassword", HopsworksClient.getInstance().getHopsworksHttpClient().getCertKey());
        return connectionString + credentials.entrySet().stream().map(cred -> (String)cred.getKey() + "=" + (String)cred.getValue()).collect(Collectors.joining(";"));
    }

    public static Date getDateFromDateString(String inputDate) throws FeatureStoreException, ParseException {
        if (inputDate != null) {
            return new Date(FeatureGroupUtils.getTimeStampFromDateString(inputDate));
        }
        return null;
    }

    public static Long getTimeStampFromDateString(String inputDate) throws FeatureStoreException, ParseException {
        HashMap<Pattern, String> dateFormatPatterns = new HashMap<Pattern, String>(){
            {
                this.put(Pattern.compile("^([0-9]{4})([0-9]{2})([0-9]{2})$"), "yyyyMMdd");
                this.put(Pattern.compile("^([0-9]{4})([0-9]{2})([0-9]{2})([0-9]{2})$"), "yyyyMMddHH");
                this.put(Pattern.compile("^([0-9]{4})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})$"), "yyyyMMddHHmm");
                this.put(Pattern.compile("^([0-9]{4})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})$"), "yyyyMMddHHmmss");
                this.put(Pattern.compile("^([0-9]{4})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{2})([0-9]{3})$"), "yyyyMMddHHmmssSSS");
            }
        };
        String tempDate = inputDate.replace("/", "").replace("-", "").replace(" ", "").replace(":", "");
        String dateFormatPattern = null;
        for (Pattern pattern : dateFormatPatterns.keySet()) {
            if (!pattern.matcher(tempDate).matches()) continue;
            dateFormatPattern = (String)dateFormatPatterns.get(pattern);
            break;
        }
        if (dateFormatPattern == null) {
            throw new FeatureStoreException("Unable to identify format of the provided date value : " + inputDate);
        }
        SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
        Long commitTimeStamp = dateFormat.parse(tempDate).getTime();
        return commitTimeStamp;
    }

    public String timeStampToHudiFormat(Long commitedOnTimeStamp) {
        Timestamp commitedOnDate = new Timestamp(commitedOnTimeStamp);
        return this.dateFormat.format(commitedOnDate);
    }

    public Map<Long, Map<String, String>> getCommitDetails(FeatureGroupBase featureGroup, String wallclockTime, Integer limit) throws FeatureStoreException, IOException, ParseException {
        Long wallclockTimestamp = wallclockTime != null ? FeatureGroupUtils.getTimeStampFromDateString(wallclockTime) : null;
        List<FeatureGroupCommit> featureGroupCommits = this.featureGroupApi.getCommitDetails(featureGroup, wallclockTimestamp, limit);
        if (featureGroupCommits == null) {
            throw new FeatureStoreException("There are no commit details available for this Feature group");
        }
        HashMap<Long, Map<String, String>> commitDetails = new HashMap<Long, Map<String, String>>();
        for (final FeatureGroupCommit featureGroupCommit : featureGroupCommits) {
            commitDetails.put(featureGroupCommit.getCommitID(), (Map<String, String>)new HashMap<String, String>(){
                {
                    this.put("committedOn", FeatureGroupUtils.this.timeStampToHudiFormat(featureGroupCommit.getCommitID()));
                    this.put("rowsUpdated", featureGroupCommit.getRowsUpdated() != null ? featureGroupCommit.getRowsUpdated().toString() : "0");
                    this.put("rowsInserted", featureGroupCommit.getRowsInserted() != null ? featureGroupCommit.getRowsInserted().toString() : "0");
                    this.put("rowsDeleted", featureGroupCommit.getRowsDeleted() != null ? featureGroupCommit.getRowsDeleted().toString() : "0");
                }
            });
        }
        return commitDetails;
    }

    public String getAvroSchema(FeatureGroupBase featureGroup, FeatureStoreBase featureStoreBase) throws FeatureStoreException, IOException {
        return this.kafkaApi.getTopicSubject(featureStoreBase, featureGroup.getOnlineTopicName()).getSchema();
    }

    public List<String> getComplexFeatures(List<Feature> features) {
        return features.stream().filter(Feature::isComplex).map(Feature::getName).collect(Collectors.toList());
    }

    public String getFeatureAvroSchema(String featureName, Schema schema) throws FeatureStoreException, IOException {
        Schema.Field complexField = schema.getFields().stream().filter(field -> field.name().equalsIgnoreCase(featureName)).findFirst().orElseThrow(() -> new FeatureStoreException("Complex feature `" + featureName + "` not found in AVRO schema of online feature group."));
        return complexField.schema().toString(false);
    }

    public Schema getDeserializedEncodedAvroSchema(Schema schema, List<String> complexFeatures) throws FeatureStoreException, IOException {
        List fields = schema.getFields().stream().map(field -> complexFeatures.contains(field.name()) ? new Schema.Field(field.name(), (Schema)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)SchemaBuilder.builder().unionOf().nullType()).and().bytesType()).endUnion(), null, null) : new Schema.Field(field.name(), field.schema(), null, null)).collect(Collectors.toList());
        return Schema.createRecord((String)schema.getName(), null, (String)schema.getNamespace(), (boolean)schema.isError(), fields);
    }

    public String getEncodedAvroSchema(Schema schema, List<String> complexFeatures) throws FeatureStoreException, IOException {
        Schema deserializedEncodedAvroSchema = this.getDeserializedEncodedAvroSchema(schema, complexFeatures);
        return deserializedEncodedAvroSchema.toString(false);
    }

    public Schema getDeserializedAvroSchema(String avroSchema) throws FeatureStoreException, IOException {
        try {
            return new Schema.Parser().parse(avroSchema);
        }
        catch (SchemaParseException e) {
            throw new FeatureStoreException("Failed to deserialize online feature group schema" + avroSchema + ".");
        }
    }

    public void verifyAttributeKeyNames(FeatureGroupBase featureGroup, List<String> partitionKeyNames, String precombineKeyName) throws FeatureStoreException {
        List<Feature> features = featureGroup.getFeatures();
        List<String> featureNames = features.stream().map(Feature::getName).collect(Collectors.toList());
        if (featureGroup.getPrimaryKeys() != null && !featureGroup.getPrimaryKeys().isEmpty()) {
            this.checkListdiff(featureGroup.getPrimaryKeys(), featureNames, "primary");
        }
        if (partitionKeyNames != null && !partitionKeyNames.isEmpty()) {
            this.checkListdiff(partitionKeyNames, featureNames, "partition");
        }
        if (precombineKeyName != null && !featureNames.contains(precombineKeyName)) {
            throw new FeatureStoreException("Provided Hudi precombine key " + precombineKeyName + " doesn't exist in feature dataframe");
        }
        if (featureGroup.getEventTime() != null && !featureNames.contains(featureGroup.getEventTime())) {
            throw new FeatureStoreException("Provided eventTime feature name " + featureGroup.getEventTime() + " doesn't exist in feature dataframe");
        }
    }

    private void checkListdiff(List<String> primaryPartitionKeyNames, List<String> featureNames, String attributeName) throws FeatureStoreException {
        List differences = primaryPartitionKeyNames.stream().filter(element -> !featureNames.contains(element)).collect(Collectors.toList());
        if (!differences.isEmpty()) {
            throw new FeatureStoreException("Provided " + attributeName + " key(s) " + String.join((CharSequence)", ", differences) + " doesn't exist in feature dataframe");
        }
    }

    public Subject getSubject(FeatureGroupBase featureGroup) throws FeatureStoreException, IOException {
        return this.kafkaApi.getTopicSubject(featureGroup.getFeatureStore(), featureGroup.getOnlineTopicName());
    }
}

