/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.datavalidation;

import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.datavalidation.FeatureGroupExpectationFacade;
import io.hops.hopsworks.common.featurestore.datavalidation.FeatureGroupValidationFacade;
import io.hops.hopsworks.common.featurestore.datavalidation.FeatureStoreExpectationFacade;
import io.hops.hopsworks.common.featurestore.datavalidation.RuleFacade;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.ExpectationResult;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.ValidationResult;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeatureGroupCommitFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.FeatureGroupCommit;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Expectation;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.FeatureGroupExpectation;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.FeatureGroupValidation;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.FeatureStoreExpectation;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Name;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Predicate;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Rule;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.ValidationRule;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.javatuples.Pair;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class FeatureGroupValidationsController {
    private static final Logger LOGGER = Logger.getLogger(FeatureGroupValidationsController.class.getName());
    private static final String PATH_TO_DATA_VALIDATION = "/Projects/%s/" + Settings.ServiceDataset.DATAVALIDATION.getName();
    private static final String PATH_TO_DATA_VALIDATION_RESULT = PATH_TO_DATA_VALIDATION + "/" + "%s" + "/" + "%d";
    @EJB
    private DistributedFsService distributedFsService;
    @EJB
    private InodeController inodeController;
    @EJB
    private HdfsUsersController hdfsUsersController;
    @EJB
    private RuleFacade validationRuleFacade;
    @EJB
    private FeatureGroupExpectationFacade featureGroupExpectationFacade;
    @EJB
    private FeatureStoreExpectationFacade featureStoreExpectationFacade;
    @EJB
    private FeatureGroupValidationFacade featureGroupValidationFacade;
    @EJB
    private FeatureGroupCommitFacade featureGroupCommitFacade;
    @EJB
    private FeaturegroupController featuregroupController;
    @EJB
    private FeaturestoreActivityFacade activityFacade;

    public Pair<FeatureGroupValidation, List<ExpectationResult>> getFeatureGroupValidationResults(Users user, Project project, Featuregroup featuregroup, Date validationTime) throws FeaturestoreException {
        LOGGER.log(Level.FINE, "Fetching validation result for feature group " + featuregroup.getName());
        String hdfsUsername = this.hdfsUsersController.getHdfsUserName(project, user);
        DistributedFileSystemOps udfso = null;
        try {
            Path path2result = new Path(String.format(PATH_TO_DATA_VALIDATION_RESULT, project.getName(), featuregroup.getName(), featuregroup.getVersion()) + "/" + validationTime.getTime());
            udfso = this.distributedFsService.getDfsOps(hdfsUsername);
            FileStatus[] validationFile = udfso.getFilesystem().globStatus(path2result);
            if (validationFile == null || validationFile.length == 0) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.DATA_VALIDATION_RESULTS_NOT_FOUND, Level.FINE, "feature group: " + featuregroup.getName() + ", validation time: " + validationTime.getTime());
            }
            List<ExpectationResult> expectationResults = new ArrayList<ExpectationResult>();
            for (FileStatus fileStatus : validationFile) {
                Path resultFile = fileStatus.getPath();
                try (FSDataInputStream inStream = udfso.open(resultFile);){
                    ByteArrayOutputStream out = new ByteArrayOutputStream();
                    IOUtils.copyBytes((InputStream)inStream, (OutputStream)out, (int)512);
                    expectationResults = this.getValidationsFromFile(out.toString());
                }
            }
            FeatureGroupValidation featureGroupValidation = this.featureGroupValidationFacade.findByFeaturegroupAndValidationTime(featuregroup, validationTime).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.DATA_VALIDATION_NOT_FOUND, Level.FINE, "feature group: " + featuregroup.getName() + ", validation time: " + validationTime));
            featureGroupValidation.setStatus(this.getValidationResultStatus(expectationResults));
            Pair pair = new Pair((Object)featureGroupValidation, expectationResults);
            this.distributedFsService.closeDfsClient(udfso);
            return pair;
        }
        catch (IOException ex) {
            try {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_READ_DATA_VALIDATION_RESULT, Level.WARNING, "Failed to read validation result", "Failed to read validation result from HDFS for Feature group " + featuregroup.getName(), (Throwable)ex);
            }
            catch (Throwable throwable) {
                this.distributedFsService.closeDfsClient(udfso);
                throw throwable;
            }
        }
    }

    private List<ExpectationResult> getValidationsFromFile(String rules) {
        Gson deserializer = new Gson();
        JsonArray topLevelObject = ((JsonElement)deserializer.fromJson(rules, JsonElement.class)).getAsJsonArray();
        JsonArray dataValidationsJson = topLevelObject.getAsJsonArray();
        ArrayList<ExpectationResult> results = new ArrayList<ExpectationResult>(dataValidationsJson.size());
        dataValidationsJson.forEach(result -> {
            ExpectationResult cg = (ExpectationResult)deserializer.fromJson(result, ExpectationResult.class);
            results.add(cg);
        });
        return results;
    }

    public FeatureGroupValidation putFeatureGroupValidationResults(Users user, Project project, Featuregroup featuregroup, List<ExpectationResult> results, Long validationTime) throws FeaturestoreException {
        FeatureGroupValidation.Status status = this.getValidationResultStatus(results);
        if (featuregroup.getValidationType().getSeverity() < status.getSeverity()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_GROUP_CHECKS_FAILED, Level.FINE, "Results: " + results.stream().filter(result -> result.getStatus().getSeverity() >= FeatureGroupValidation.Status.WARNING.getSeverity()).collect(Collectors.toList()));
        }
        String hdfsUsername = this.hdfsUsersController.getHdfsUserName(project, user);
        DistributedFileSystemOps udfso = null;
        try {
            String path2result = String.format(PATH_TO_DATA_VALIDATION_RESULT, project.getName(), featuregroup.getName(), featuregroup.getVersion()) + "/" + validationTime;
            udfso = this.distributedFsService.getDfsOps(hdfsUsername);
            Gson gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
            try (FSDataOutputStream outStream = udfso.create(path2result);){
                outStream.writeBytes(gson.toJson(results));
                outStream.hflush();
            }
            Timestamp validationTimeDate = new Timestamp(validationTime);
            FeatureGroupValidation featureGroupValidation = new FeatureGroupValidation((Date)validationTimeDate, this.inodeController.getInodeAtPath(path2result), featuregroup, this.getValidationResultStatus(results));
            this.featureGroupValidationFacade.persist(featureGroupValidation);
            this.activityFacade.logValidationActivity(featuregroup, user, featureGroupValidation);
            FeatureGroupValidation featureGroupValidation2 = featureGroupValidation;
            this.distributedFsService.closeDfsClient(udfso);
            return featureGroupValidation2;
        }
        catch (IOException ex) {
            try {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_READ_DATA_VALIDATION_RESULT, Level.WARNING, "Failed to persist validation results", "Failed to persist validation result to HDFS for Feature group " + featuregroup.getName(), (Throwable)ex);
            }
            catch (Throwable throwable) {
                this.distributedFsService.closeDfsClient(udfso);
                throw throwable;
            }
        }
    }

    public FeatureGroupExpectation getFeatureGroupExpectation(Featuregroup featuregroup, String name) throws FeaturestoreException {
        FeatureStoreExpectation featureStoreExpectation = this.getFeatureStoreExpectation(featuregroup.getFeaturestore(), name);
        Optional<FeatureGroupExpectation> optional = this.featureGroupExpectationFacade.findByFeaturegroupAndExpectation(featuregroup, featureStoreExpectation);
        if (!optional.isPresent()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_GROUP_EXPECTATION_NOT_FOUND, Level.FINE, "Expectation: " + name);
        }
        return optional.get();
    }

    public FeatureStoreExpectation getFeatureStoreExpectation(Featurestore featurestore, String name) throws FeaturestoreException {
        Optional<FeatureStoreExpectation> e = this.featureStoreExpectationFacade.findByFeaturestoreAndName(featurestore, name);
        if (!e.isPresent()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_STORE_EXPECTATION_NOT_FOUND, Level.FINE, "Expectation: " + name);
        }
        return e.get();
    }

    public ValidationRule getValidationRuleByNameAndPredicate(Name name, Predicate predicate) {
        return this.validationRuleFacade.findByNameAndPredicate(name, predicate);
    }

    private FeatureGroupValidation.Status getValidationResultStatus(List<ExpectationResult> results) {
        int success = 0;
        int warning = 0;
        int error = 0;
        int expectationSuccess = 0;
        int expectationWarning = 0;
        int expectationError = 0;
        io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level level = io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level.WARNING;
        for (ExpectationResult result : results) {
            for (ValidationResult validationResult : result.getResults()) {
                if (validationResult.getRule().getLevel().equals((Object)io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level.ERROR)) {
                    level = io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level.ERROR;
                }
                switch (validationResult.getStatus()) {
                    case SUCCESS: {
                        ++success;
                        break;
                    }
                    case WARNING: {
                        ++warning;
                        break;
                    }
                    case FAILURE: {
                        ++error;
                        break;
                    }
                }
            }
            result.setStatus(this.calculateStatus(success, warning, error, level));
            switch (result.getStatus()) {
                case SUCCESS: {
                    ++expectationSuccess;
                    break;
                }
                case WARNING: {
                    ++expectationWarning;
                    break;
                }
                case FAILURE: {
                    ++expectationError;
                    break;
                }
            }
        }
        return this.calculateStatus(expectationSuccess, expectationWarning, expectationError, level);
    }

    private FeatureGroupValidation.Status calculateStatus(int success, int warning, int error, io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level level) {
        if (success != 0 && warning == 0 && error == 0) {
            return FeatureGroupValidation.Status.SUCCESS;
        }
        if (level.equals((Object)io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level.WARNING)) {
            if (warning > 0 && error == 0) {
                return FeatureGroupValidation.Status.WARNING;
            }
            if (warning >= 0 && error > 0) {
                return FeatureGroupValidation.Status.FAILURE;
            }
        }
        if (level.equals((Object)io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidation.Level.ERROR)) {
            return FeatureGroupValidation.Status.FAILURE;
        }
        return FeatureGroupValidation.Status.NONE;
    }

    public FeatureStoreExpectation createOrUpdateExpectation(Featurestore featurestore, Expectation expectation) throws FeaturestoreException {
        HashSet<ValidationRule> validationRules = new HashSet<ValidationRule>();
        for (Rule rule : expectation.getRules()) {
            Optional<ValidationRule> validationRule = this.validationRuleFacade.findByName(rule.getName());
            if (!validationRule.isPresent()) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_STORE_RULE_NOT_FOUND, Level.FINE, "Rule: " + rule.getName());
            }
            validationRules.add(validationRule.get());
        }
        Optional<FeatureStoreExpectation> e = this.featureStoreExpectationFacade.findByFeaturestoreAndName(featurestore, expectation.getName());
        FeatureStoreExpectation featureStoreExpectation = e.orElseGet(FeatureStoreExpectation::new);
        featureStoreExpectation.setValidationRules(validationRules);
        featureStoreExpectation.setFeatureStore(featurestore);
        featureStoreExpectation.setName(expectation.getName());
        featureStoreExpectation.setDescription(expectation.getDescription());
        featureStoreExpectation.setExpectation(expectation);
        return this.featureStoreExpectationFacade.merge(featureStoreExpectation);
    }

    public void deleteExpectation(Featurestore featurestore, String name) throws FeaturestoreException {
        this.featureStoreExpectationFacade.remove(this.getFeatureStoreExpectation(featurestore, name));
    }

    public FeatureGroupExpectation attachExpectation(Featuregroup featuregroup, String name, Project project, Users user) throws FeaturestoreException {
        FeatureStoreExpectation featureStoreExpectation = this.getFeatureStoreExpectation(featuregroup.getFeaturestore(), name);
        Optional<FeatureGroupExpectation> attachedExpectation = this.featureGroupExpectationFacade.findByFeaturegroupAndExpectation(featuregroup, featureStoreExpectation);
        this.checkFeaturesExist(featureStoreExpectation, featuregroup, name, project, user);
        if (!attachedExpectation.isPresent()) {
            FeatureGroupExpectation featureGroupExpectation = new FeatureGroupExpectation();
            featureGroupExpectation.setFeaturegroup(featuregroup);
            featureGroupExpectation.setFeatureStoreExpectation(featureStoreExpectation);
            return this.featureGroupExpectationFacade.merge(featureGroupExpectation);
        }
        return attachedExpectation.get();
    }

    public void checkFeaturesExist(FeatureStoreExpectation featureStoreExpectation, Featuregroup featuregroup, String name, Project project, Users user) throws FeaturestoreException {
        List expectationFeatures = featureStoreExpectation.getExpectation().getFeatures();
        List<FeatureGroupFeatureDTO> featuresDTOs = this.featuregroupController.getFeatures(featuregroup, project, user);
        ArrayList<String> features = new ArrayList<String>();
        for (FeatureGroupFeatureDTO featureGroupFeature : featuresDTOs) {
            features.add(featureGroupFeature.getName());
        }
        ArrayList<String> featuresNotFound = new ArrayList<String>();
        for (String expectationFeature : expectationFeatures) {
            if (features.contains(expectationFeature)) continue;
            featuresNotFound.add(expectationFeature);
        }
        if (!featuresNotFound.isEmpty()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_GROUP_EXPECTATION_FEATURE_NOT_FOUND, Level.FINE, "expectation: " + name + ", feature: " + featuresNotFound);
        }
    }

    public void detachExpectation(Featuregroup featuregroup, String name) throws FeaturestoreException {
        FeatureStoreExpectation featureStoreExpectation = this.getFeatureStoreExpectation(featuregroup.getFeaturestore(), name);
        Optional<FeatureGroupExpectation> e = this.featureGroupExpectationFacade.findByFeaturegroupAndExpectation(featuregroup, featureStoreExpectation);
        this.featureGroupExpectationFacade.remove(e.orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURE_GROUP_EXPECTATION_NOT_FOUND, Level.FINE, "expectation: " + name)));
    }

    public Long getCommitTime(FeatureGroupValidation featureGroupValidation) {
        return this.featureGroupCommitFacade.findByValidation(featureGroupValidation).map(FeatureGroupCommit::getCommittedOn).orElse(null);
    }
}

