/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.stream;

import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.OfflineFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupFacade;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeatureExtraConstraints;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.stream.StreamFeatureGroup;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class StreamFeatureGroupController {
    @EJB
    private StreamFeatureGroupFacade streamFeatureGroupFacade;
    @EJB
    private OnlineFeaturegroupController onlineFeaturegroupController;
    @EJB
    private OfflineFeatureGroupController offlineFeatureGroupController;
    @EJB
    private FeaturegroupController featuregroupController;
    @EJB
    private FeaturestoreUtils featurestoreUtils;
    @EJB
    private CachedFeaturegroupController cachedFeaturegroupController;
    @EJB
    private FeaturestoreActivityFacade fsActivityFacade;
    @EJB
    private FeatureGroupInputValidation featureGroupInputValidation;
    @EJB
    private FeaturestoreController featurestoreController;

    public StreamFeatureGroupDTO convertStreamFeatureGroupToDTO(Featuregroup featuregroup) throws ServiceException {
        StreamFeatureGroupDTO streamFeatureGroupDTO = new StreamFeatureGroupDTO(featuregroup);
        streamFeatureGroupDTO.setOnlineTopicName(Utils.getFeatureGroupTopicName(featuregroup));
        streamFeatureGroupDTO.setName(featuregroup.getName());
        streamFeatureGroupDTO.setDescription(featuregroup.getDescription());
        streamFeatureGroupDTO.setOnlineEnabled(featuregroup.isOnlineEnabled());
        streamFeatureGroupDTO.setLocation(this.featurestoreUtils.resolveLocation(this.featuregroupController.getFeatureGroupLocation(featuregroup)));
        return streamFeatureGroupDTO;
    }

    public List<FeatureGroupFeatureDTO> getFeaturesDTO(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        Set primaryKeys = featuregroup.getStreamFeatureGroup().getFeaturesExtraConstraints().stream().filter(CachedFeatureExtraConstraints::getPrimary).map(CachedFeatureExtraConstraints::getName).collect(Collectors.toSet());
        Set precombineKeys = featuregroup.getStreamFeatureGroup().getFeaturesExtraConstraints().stream().filter(CachedFeatureExtraConstraints::getHudiPrecombineKey).map(CachedFeatureExtraConstraints::getName).collect(Collectors.toSet());
        Map<String, String> featureDescription = featuregroup.getStreamFeatureGroup().getCachedFeatures().stream().collect(Collectors.toMap(CachedFeature::getName, CachedFeature::getDescription));
        List<FeatureGroupFeatureDTO> featureGroupFeatures = this.offlineFeatureGroupController.getSchema(featuregroup.getFeaturestore(), this.featuregroupController.getTblName(featuregroup), project, user);
        for (FeatureGroupFeatureDTO feature : featureGroupFeatures) {
            feature.setPrimary(primaryKeys.contains(feature.getName()));
            feature.setHudiPrecombineKey(precombineKeys.contains(feature.getName()));
            feature.setDescription(featureDescription.get(feature.getName()));
            feature.setFeatureGroupId(featuregroup.getId());
        }
        featureGroupFeatures = this.cachedFeaturegroupController.dropHudiSpecFeatureGroupFeature(featureGroupFeatures);
        return featureGroupFeatures;
    }

    public List<FeatureGroupFeatureDTO> getFeaturesDTOOnlineChecked(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS = this.getFeaturesDTO(featuregroup, project, user);
        if (featuregroup.isOnlineEnabled()) {
            featureGroupFeatureDTOS = this.onlineFeaturegroupController.getFeaturegroupFeatures(featuregroup, featureGroupFeatureDTOS);
        }
        return featureGroupFeatureDTOS;
    }

    public void deleteFeatureGroup(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, IOException, ServiceException {
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        String tableName = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        this.offlineFeatureGroupController.dropFeatureGroup(db, tableName, project, user);
        this.streamFeatureGroupFacade.remove(featuregroup.getStreamFeatureGroup());
    }

    private StreamFeatureGroup persistStreamFeatureGroupMetadata(List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS) {
        StreamFeatureGroup streamFeatureGroup = new StreamFeatureGroup();
        streamFeatureGroup.setCachedFeatures((Collection)featureGroupFeatureDTOS.stream().filter(feature -> feature.getDescription() != null).map(feature -> new CachedFeature(streamFeatureGroup, feature.getName(), feature.getDescription())).collect(Collectors.toList()));
        streamFeatureGroup.setFeaturesExtraConstraints(this.cachedFeaturegroupController.buildFeatureExtraConstrains(featureGroupFeatureDTOS, null, streamFeatureGroup));
        this.streamFeatureGroupFacade.persist(streamFeatureGroup);
        return streamFeatureGroup;
    }

    public StreamFeatureGroup createStreamFeatureGroup(Featurestore featurestore, StreamFeatureGroupDTO streamFeatureGroupDTO, Project project, Users user) throws FeaturestoreException {
        this.cachedFeaturegroupController.verifyPrimaryKey(streamFeatureGroupDTO, TimeTravelFormat.HUDI);
        String tableName = this.featuregroupController.getTblName(streamFeatureGroupDTO.getName(), streamFeatureGroupDTO.getVersion());
        this.offlineFeatureGroupController.createHiveTable(featurestore, tableName, this.cachedFeaturegroupController.addHudiSpecFeatures(streamFeatureGroupDTO.getFeatures()), project, user, OfflineFeatureGroupController.Formats.HUDI);
        return this.persistStreamFeatureGroupMetadata(streamFeatureGroupDTO.getFeatures());
    }

    public void updateMetadata(Project project, Users user, Featuregroup featuregroup, FeaturegroupDTO featuregroupDTO) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        List<FeatureGroupFeatureDTO> previousSchema = this.getFeaturesDTO(featuregroup, project, user);
        String tableName = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        ArrayList<FeatureGroupFeatureDTO> newFeatures = new ArrayList();
        if (featuregroupDTO.getFeatures() != null) {
            this.cachedFeaturegroupController.verifyPreviousSchemaUnchanged(previousSchema, featuregroupDTO.getFeatures());
            newFeatures = this.featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures());
        }
        this.updateCachedDescriptions(featuregroup.getStreamFeatureGroup(), featuregroupDTO.getFeatures());
        if (!newFeatures.isEmpty()) {
            this.offlineFeatureGroupController.alterHiveTableFeatures(featuregroup.getFeaturestore(), tableName, newFeatures, project, user);
            if (featuregroup.isOnlineEnabled()) {
                this.onlineFeaturegroupController.alterOnlineFeatureGroupSchema(featuregroup, newFeatures, featuregroupDTO.getFeatures(), project, user);
            } else {
                this.onlineFeaturegroupController.alterFeatureGroupSchema(featuregroup, featuregroupDTO.getFeatures(), project);
            }
            String newFeaturesStr = "New features: " + newFeatures.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.joining(","));
            this.fsActivityFacade.logMetadataActivity(user, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, newFeaturesStr);
        }
    }

    private void updateCachedDescriptions(StreamFeatureGroup streamFeatureGroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOs) {
        for (FeatureGroupFeatureDTO feature : featureGroupFeatureDTOs) {
            Optional<CachedFeature> previousCachedFeature = this.cachedFeaturegroupController.getCachedFeature(streamFeatureGroup.getCachedFeatures(), feature.getName());
            if (feature.getDescription() == null) continue;
            if (previousCachedFeature.isPresent()) {
                previousCachedFeature.get().setDescription(feature.getDescription());
                continue;
            }
            streamFeatureGroup.getCachedFeatures().add(new CachedFeature(streamFeatureGroup, feature.getName(), feature.getDescription()));
        }
        this.streamFeatureGroupFacade.updateMetadata(streamFeatureGroup);
    }
}

