/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.logicalclocks.hsfs.EntityEndpointType;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStore;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.engine.DataValidationEngine;
import com.logicalclocks.hsfs.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.engine.StatisticsEngine;
import com.logicalclocks.hsfs.metadata.Expectation;
import com.logicalclocks.hsfs.metadata.ExpectationsApi;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.FeatureGroupValidation;
import com.logicalclocks.hsfs.metadata.Statistics;
import com.logicalclocks.hsfs.metadata.validation.ValidationType;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

@JsonIgnoreProperties(ignoreUnknown=true)
public class FeatureGroup
extends FeatureGroupBase {
    private Boolean onlineEnabled;
    private String type = "cachedFeaturegroupDTO";
    private TimeTravelFormat timeTravelFormat = TimeTravelFormat.HUDI;
    protected String location;
    @JsonProperty(value="validationType")
    private ValidationType validationType = ValidationType.NONE;
    private List<String> statisticColumns;
    private List<String> expectationsNames;
    @JsonIgnore
    private List<String> primaryKeys;
    @JsonIgnore
    private List<String> partitionKeys;
    @JsonIgnore
    private String hudiPrecombineKey;
    @JsonIgnore
    private String avroSchema;
    private String onlineTopicName;
    private final FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine();
    private final StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.FEATURE_GROUP);
    private final ExpectationsApi expectationsApi = new ExpectationsApi(EntityEndpointType.FEATURE_GROUP);
    private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class);

    public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features, StatisticsConfig statisticsConfig, ValidationType validationType, Seq<Expectation> expectations, String onlineTopicName) {
        if (name == null) {
            throw new NullPointerException("name is marked non-null but is null");
        }
        this.featureStore = featureStore;
        this.name = name;
        this.version = version;
        this.description = description;
        this.primaryKeys = primaryKeys != null ? primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
        this.partitionKeys = partitionKeys != null ? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
        this.hudiPrecombineKey = timeTravelFormat == TimeTravelFormat.HUDI && hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
        this.onlineEnabled = onlineEnabled;
        this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
        this.features = features;
        this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
        ValidationType validationType2 = this.validationType = validationType != null ? validationType : ValidationType.NONE;
        if (expectations != null && !expectations.isEmpty()) {
            this.expectationsNames = new ArrayList<String>();
            ((List)JavaConverters.seqAsJavaListConverter(expectations).asJava()).forEach(expectation -> this.expectationsNames.add(expectation.getName()));
        }
        this.onlineTopicName = onlineTopicName;
    }

    public FeatureGroup() {
    }

    public void updateValidationType(ValidationType validationType) throws FeatureStoreException, IOException {
        this.validationType = validationType;
        this.featureGroupEngine.updateValidationType(this);
    }

    @Override
    public Dataset<Row> read() throws FeatureStoreException, IOException {
        return this.read(false, null);
    }

    public Dataset<Row> read(boolean online) throws FeatureStoreException, IOException {
        return this.selectAll().read(online);
    }

    public Dataset<Row> read(Map<String, String> readOptions) throws FeatureStoreException, IOException {
        return this.read(false, readOptions);
    }

    public Dataset<Row> read(boolean online, Map<String, String> readOptions) throws FeatureStoreException, IOException {
        return this.selectAll().read(online, readOptions);
    }

    public Dataset<Row> read(String wallclockTime) throws FeatureStoreException, IOException, ParseException {
        return this.selectAll().asOf(wallclockTime).read(false, null);
    }

    public Dataset<Row> read(String wallclockTime, Map<String, String> readOptions) throws FeatureStoreException, IOException, ParseException {
        return this.selectAll().asOf(wallclockTime).read(false, readOptions);
    }

    public Dataset<Row> readChanges(String wallclockStartTime, String wallclockEndTime) throws FeatureStoreException, IOException, ParseException {
        return this.selectAll().pullChanges(wallclockStartTime, wallclockEndTime).read(false, null);
    }

    public Dataset<Row> readChanges(String wallclockStartTime, String wallclockEndTime, Map<String, String> readOptions) throws FeatureStoreException, IOException, ParseException {
        return this.selectAll().pullChanges(wallclockStartTime, wallclockEndTime).read(false, readOptions);
    }

    public void show(int numRows) throws FeatureStoreException, IOException {
        this.show(numRows, false);
    }

    public void show(int numRows, boolean online) throws FeatureStoreException, IOException {
        this.read(online).show(numRows);
    }

    public void save(Dataset<Row> featureData) throws FeatureStoreException, IOException, ParseException {
        this.save(featureData, null);
    }

    public void save(Dataset<Row> featureData, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        this.featureGroupEngine.save(this, featureData, this.primaryKeys, this.partitionKeys, this.hudiPrecombineKey, writeOptions);
        if (this.statisticsConfig.getEnabled().booleanValue()) {
            this.statisticsEngine.computeStatistics(this, featureData, null);
        }
    }

    public void insert(Dataset<Row> featureData) throws IOException, FeatureStoreException, ParseException {
        this.insert(featureData, null, false);
    }

    public void insert(Dataset<Row> featureData, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        this.insert(featureData, null, false, null, writeOptions);
    }

    public void insert(Dataset<Row> featureData, Storage storage) throws IOException, FeatureStoreException, ParseException {
        this.insert(featureData, storage, false, null, null);
    }

    public void insert(Dataset<Row> featureData, boolean overwrite) throws IOException, FeatureStoreException, ParseException {
        this.insert(featureData, null, overwrite);
    }

    public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite) throws IOException, FeatureStoreException, ParseException {
        this.insert(featureData, storage, overwrite, null, null);
    }

    public void insert(Dataset<Row> featureData, boolean overwrite, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        this.insert(featureData, null, overwrite, null, writeOptions);
    }

    public void insert(Dataset<Row> featureData, HudiOperationType operation) throws FeatureStoreException, IOException, ParseException {
        this.insert(featureData, null, false, operation, null);
    }

    public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite, HudiOperationType operation, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        if (operation != null && this.timeTravelFormat == TimeTravelFormat.NONE) {
            throw new IllegalArgumentException("operation argument is valid only for time travel enable feature groups");
        }
        if (operation == null && this.timeTravelFormat == TimeTravelFormat.HUDI) {
            operation = overwrite ? HudiOperationType.BULK_INSERT : HudiOperationType.UPSERT;
        }
        this.featureGroupEngine.insert(this, featureData, storage, operation, overwrite ? SaveMode.Overwrite : SaveMode.Append, writeOptions);
        this.computeStatistics();
    }

    public StreamingQuery insertStream(Dataset<Row> featureData) throws StreamingQueryException, IOException, FeatureStoreException {
        return this.insertStream(featureData, null);
    }

    public StreamingQuery insertStream(Dataset<Row> featureData, String queryName) throws StreamingQueryException, IOException, FeatureStoreException {
        return this.insertStream(featureData, queryName, "append");
    }

    public StreamingQuery insertStream(Dataset<Row> featureData, String queryName, String outputMode) throws StreamingQueryException, IOException, FeatureStoreException {
        return this.insertStream(featureData, queryName, outputMode, false, null);
    }

    public StreamingQuery insertStream(Dataset<Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout) throws StreamingQueryException, IOException, FeatureStoreException {
        return this.insertStream(featureData, queryName, outputMode, awaitTermination, timeout, null);
    }

    public StreamingQuery insertStream(Dataset<Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, Map<String, String> writeOptions) throws FeatureStoreException, IOException, StreamingQueryException {
        if (!featureData.isStreaming()) {
            throw new FeatureStoreException("Features have to be a streaming type spark dataframe. Use `insert()` method instead.");
        }
        LOGGER.info("StatisticsWarning: Stream ingestion for feature group `" + this.name + "`, with version `" + this.version + "` will not compute statistics.");
        return this.featureGroupEngine.insertStream(this, featureData, queryName, outputMode, awaitTermination, timeout, writeOptions);
    }

    public void commitDeleteRecord(Dataset<Row> featureData) throws FeatureStoreException, IOException, ParseException {
        this.featureGroupEngine.commitDelete(this, featureData, null);
    }

    public void commitDeleteRecord(Dataset<Row> featureData, Map<String, String> writeOptions) throws FeatureStoreException, IOException, ParseException {
        this.featureGroupEngine.commitDelete(this, featureData, writeOptions);
    }

    public Map<Long, Map<String, String>> commitDetails() throws IOException, FeatureStoreException, ParseException {
        return this.featureGroupEngine.commitDetails(this, null);
    }

    public Map<Long, Map<String, String>> commitDetails(Integer limit) throws IOException, FeatureStoreException, ParseException {
        return this.featureGroupEngine.commitDetails(this, limit);
    }

    public Map<Long, Map<String, String>> commitDetails(String wallclockTime) throws IOException, FeatureStoreException, ParseException {
        return this.featureGroupEngine.commitDetailsByWallclockTime(this, wallclockTime, null);
    }

    public Map<Long, Map<String, String>> commitDetails(String wallclockTime, Integer limit) throws IOException, FeatureStoreException, ParseException {
        return this.featureGroupEngine.commitDetailsByWallclockTime(this, wallclockTime, limit);
    }

    @JsonIgnore
    public String getAvroSchema() throws FeatureStoreException, IOException {
        if (this.avroSchema == null) {
            this.avroSchema = this.featureGroupEngine.getAvroSchema(this);
        }
        return this.avroSchema;
    }

    @JsonIgnore
    public List<String> getComplexFeatures() {
        return this.features.stream().filter(Feature::isComplex).map(Feature::getName).collect(Collectors.toList());
    }

    @JsonIgnore
    public String getFeatureAvroSchema(String featureName) throws FeatureStoreException, IOException {
        Schema schema = this.getDeserializedAvroSchema();
        Schema.Field complexField = schema.getFields().stream().filter((? super T field) -> field.name().equalsIgnoreCase(featureName)).findFirst().orElseThrow(() -> new FeatureStoreException("Complex feature `" + featureName + "` not found in AVRO schema of online feature group."));
        return complexField.schema().toString(true);
    }

    @JsonIgnore
    public String getEncodedAvroSchema() throws FeatureStoreException, IOException {
        Schema schema = this.getDeserializedAvroSchema();
        List fields = schema.getFields().stream().map(field -> this.getComplexFeatures().contains(field.name()) ? new Schema.Field(field.name(), (Schema)SchemaBuilder.builder().nullable().bytesType(), null, null) : new Schema.Field(field.name(), field.schema(), null, null)).collect(Collectors.toList());
        return Schema.createRecord((String)schema.getName(), null, (String)schema.getNamespace(), (boolean)schema.isError(), fields).toString(true);
    }

    @JsonIgnore
    public Schema getDeserializedAvroSchema() throws FeatureStoreException, IOException {
        try {
            return new Schema.Parser().parse(this.getAvroSchema());
        }
        catch (SchemaParseException e) {
            throw new FeatureStoreException("Failed to deserialize online feature group schema" + this.getAvroSchema() + ".");
        }
    }

    @JsonIgnore
    public List<String> getPrimaryKeys() {
        if (this.primaryKeys == null) {
            this.primaryKeys = this.features.stream().filter((? super T f) -> f.getPrimary()).map(Feature::getName).collect(Collectors.toList());
        }
        return this.primaryKeys;
    }

    public Expectation getExpectation(String name) throws FeatureStoreException, IOException {
        return this.expectationsApi.get(this, name);
    }

    @JsonIgnore
    public Seq<Expectation> getExpectations() throws FeatureStoreException, IOException {
        return ((Buffer)JavaConverters.asScalaBufferConverter(this.expectationsApi.get(this)).asScala()).toSeq();
    }

    public Seq<Expectation> attachExpectations(Seq<Expectation> expectations) throws FeatureStoreException, IOException {
        ArrayList<Expectation> expectationsList = new ArrayList<Expectation>();
        for (Expectation expectation : (List)JavaConverters.seqAsJavaListConverter(expectations).asJava()) {
            expectationsList.add(this.attachExpectation(expectation));
        }
        return ((Buffer)JavaConverters.asScalaBufferConverter(expectationsList).asScala()).toSeq();
    }

    public Expectation attachExpectation(Expectation expectation) throws FeatureStoreException, IOException {
        return this.attachExpectation(expectation.getName());
    }

    public Expectation attachExpectation(String name) throws FeatureStoreException, IOException {
        return this.expectationsApi.put(this, name);
    }

    public void detachExpectation(Expectation expectation) throws FeatureStoreException, IOException {
        this.detachExpectation(expectation.getName());
    }

    public void detachExpectation(String name) throws FeatureStoreException, IOException {
        this.expectationsApi.detach(this, name);
    }

    public void detachExpectations(Seq<Expectation> expectations) throws FeatureStoreException, IOException {
        for (Expectation expectation : (List)JavaConverters.seqAsJavaListConverter(expectations).asJava()) {
            this.expectationsApi.detach(this, expectation);
        }
    }

    public FeatureGroupValidation validate() throws FeatureStoreException, IOException {
        return this.validate(this.read());
    }

    public FeatureGroupValidation validate(Dataset<Row> data) throws FeatureStoreException, IOException {
        return DataValidationEngine.getInstance().validate(data, this, this.expectationsApi.get(this));
    }

    @JsonIgnore
    public List<FeatureGroupValidation> getValidations() throws FeatureStoreException, IOException {
        return DataValidationEngine.getInstance().getValidations(this);
    }

    @JsonIgnore
    public FeatureGroupValidation getValidation(Long time, DataValidationEngine.ValidationTimeType type) throws FeatureStoreException, IOException {
        return DataValidationEngine.getInstance().getValidation(this, (ImmutablePair<DataValidationEngine.ValidationTimeType, Long>)new ImmutablePair((Object)type, (Object)time));
    }

    public Statistics computeStatistics(String wallclockTime) throws FeatureStoreException, IOException, ParseException {
        if (this.statisticsConfig.getEnabled().booleanValue()) {
            Map<Long, Map<String, String>> latestCommitMetaData = this.featureGroupEngine.commitDetailsByWallclockTime(this, wallclockTime, 1);
            Dataset<Row> featureData = this.selectAll().asOf(wallclockTime).read(false, null);
            Long commitId = (Long)latestCommitMetaData.keySet().toArray()[0];
            return this.statisticsEngine.computeStatistics(this, featureData, commitId);
        }
        LOGGER.info("StorageWarning: The statistics are not enabled of feature group `" + this.name + "`, with version `" + this.version + "`. No statistics computed.");
        return null;
    }

    public static FeatureGroupBuilder builder() {
        return new FeatureGroupBuilder();
    }

    public FeatureGroup(Boolean onlineEnabled, String type, TimeTravelFormat timeTravelFormat, String location, ValidationType validationType, List<String> statisticColumns, List<String> expectationsNames, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, String avroSchema, String onlineTopicName) {
        this.onlineEnabled = onlineEnabled;
        this.type = type;
        this.timeTravelFormat = timeTravelFormat;
        this.location = location;
        this.validationType = validationType;
        this.statisticColumns = statisticColumns;
        this.expectationsNames = expectationsNames;
        this.primaryKeys = primaryKeys;
        this.partitionKeys = partitionKeys;
        this.hudiPrecombineKey = hudiPrecombineKey;
        this.avroSchema = avroSchema;
        this.onlineTopicName = onlineTopicName;
    }

    public Boolean getOnlineEnabled() {
        return this.onlineEnabled;
    }

    public void setOnlineEnabled(Boolean onlineEnabled) {
        this.onlineEnabled = onlineEnabled;
    }

    public String getType() {
        return this.type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public TimeTravelFormat getTimeTravelFormat() {
        return this.timeTravelFormat;
    }

    public void setTimeTravelFormat(TimeTravelFormat timeTravelFormat) {
        this.timeTravelFormat = timeTravelFormat;
    }

    public String getLocation() {
        return this.location;
    }

    public void setLocation(String location) {
        this.location = location;
    }

    public ValidationType getValidationType() {
        return this.validationType;
    }

    @JsonProperty(value="validationType")
    public void setValidationType(ValidationType validationType) {
        this.validationType = validationType;
    }

    public List<String> getStatisticColumns() {
        return this.statisticColumns;
    }

    public void setStatisticColumns(List<String> statisticColumns) {
        this.statisticColumns = statisticColumns;
    }

    public List<String> getExpectationsNames() {
        return this.expectationsNames;
    }

    public void setExpectationsNames(List<String> expectationsNames) {
        this.expectationsNames = expectationsNames;
    }

    public String getOnlineTopicName() {
        return this.onlineTopicName;
    }

    public void setOnlineTopicName(String onlineTopicName) {
        this.onlineTopicName = onlineTopicName;
    }

    public static class FeatureGroupBuilder {
        private FeatureStore featureStore;
        private String name;
        private Integer version;
        private String description;
        private List<String> primaryKeys;
        private List<String> partitionKeys;
        private String hudiPrecombineKey;
        private boolean onlineEnabled;
        private TimeTravelFormat timeTravelFormat;
        private List<Feature> features;
        private StatisticsConfig statisticsConfig;
        private ValidationType validationType;
        private Seq<Expectation> expectations;
        private String onlineTopicName;

        FeatureGroupBuilder() {
        }

        public FeatureGroupBuilder featureStore(FeatureStore featureStore) {
            this.featureStore = featureStore;
            return this;
        }

        public FeatureGroupBuilder name(@NonNull String name) {
            if (name == null) {
                throw new NullPointerException("name is marked non-null but is null");
            }
            this.name = name;
            return this;
        }

        public FeatureGroupBuilder version(Integer version) {
            this.version = version;
            return this;
        }

        public FeatureGroupBuilder description(String description) {
            this.description = description;
            return this;
        }

        public FeatureGroupBuilder primaryKeys(List<String> primaryKeys) {
            this.primaryKeys = primaryKeys;
            return this;
        }

        public FeatureGroupBuilder partitionKeys(List<String> partitionKeys) {
            this.partitionKeys = partitionKeys;
            return this;
        }

        public FeatureGroupBuilder hudiPrecombineKey(String hudiPrecombineKey) {
            this.hudiPrecombineKey = hudiPrecombineKey;
            return this;
        }

        public FeatureGroupBuilder onlineEnabled(boolean onlineEnabled) {
            this.onlineEnabled = onlineEnabled;
            return this;
        }

        public FeatureGroupBuilder timeTravelFormat(TimeTravelFormat timeTravelFormat) {
            this.timeTravelFormat = timeTravelFormat;
            return this;
        }

        public FeatureGroupBuilder features(List<Feature> features) {
            this.features = features;
            return this;
        }

        public FeatureGroupBuilder statisticsConfig(StatisticsConfig statisticsConfig) {
            this.statisticsConfig = statisticsConfig;
            return this;
        }

        public FeatureGroupBuilder validationType(ValidationType validationType) {
            this.validationType = validationType;
            return this;
        }

        public FeatureGroupBuilder expectations(Seq<Expectation> expectations) {
            this.expectations = expectations;
            return this;
        }

        public FeatureGroupBuilder onlineTopicName(String onlineTopicName) {
            this.onlineTopicName = onlineTopicName;
            return this;
        }

        public FeatureGroup build() {
            return new FeatureGroup(this.featureStore, this.name, this.version, this.description, this.primaryKeys, this.partitionKeys, this.hudiPrecombineKey, this.onlineEnabled, this.timeTravelFormat, this.features, this.statisticsConfig, this.validationType, this.expectations, this.onlineTopicName);
        }

        public String toString() {
            return "FeatureGroup.FeatureGroupBuilder(featureStore=" + this.featureStore + ", name=" + this.name + ", version=" + this.version + ", description=" + this.description + ", primaryKeys=" + this.primaryKeys + ", partitionKeys=" + this.partitionKeys + ", hudiPrecombineKey=" + this.hudiPrecombineKey + ", onlineEnabled=" + this.onlineEnabled + ", timeTravelFormat=" + (Object)((Object)this.timeTravelFormat) + ", features=" + this.features + ", statisticsConfig=" + this.statisticsConfig + ", validationType=" + (Object)((Object)this.validationType) + ", expectations=" + this.expectations + ", onlineTopicName=" + this.onlineTopicName + ")";
        }
    }
}

