/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.stream;

import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.datavalidationv2.suites.ExpectationSuiteController;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.OfflineFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupFacade;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.hive.HiveTableParams;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.hive.HiveTbls;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.stream.StreamFeatureGroup;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class StreamFeatureGroupController {
    @EJB
    private StreamFeatureGroupFacade streamFeatureGroupFacade;
    @EJB
    private OnlineFeaturegroupController onlineFeaturegroupController;
    @EJB
    private OfflineFeatureGroupController offlineFeatureGroupController;
    @EJB
    private FeaturegroupController featuregroupController;
    @EJB
    private FeaturestoreUtils featurestoreUtils;
    @EJB
    private CachedFeaturegroupController cachedFeaturegroupController;
    @EJB
    private FeaturestoreActivityFacade fsActivityFacade;
    @EJB
    private ExpectationSuiteController expectationSuiteController;
    @EJB
    private FeatureGroupInputValidation featureGroupInputValidation;

    public StreamFeatureGroupDTO convertStreamFeatureGroupToDTO(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, ServiceException {
        List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS;
        if (featuregroup.getExpectationSuite() != null) {
            featuregroup.setExpectationSuite(this.expectationSuiteController.addAllExpectationIdToMetaField(featuregroup.getExpectationSuite()));
        }
        StreamFeatureGroupDTO streamFeatureGroupDTO = new StreamFeatureGroupDTO(featuregroup);
        if (featuregroup.isOnlineEnabled()) {
            streamFeatureGroupDTO.setOnlineTopicName(this.onlineFeaturegroupController.onlineFeatureGroupTopicName(project.getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)));
            featureGroupFeatureDTOS = this.onlineFeaturegroupController.getFeaturegroupFeatures(featuregroup, this.cachedFeaturegroupController.getFeaturesDTO(featuregroup.getStreamFeatureGroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, user));
        } else {
            streamFeatureGroupDTO.setOnlineTopicName(this.offlineStreamFeatureGroupTopicName(project.getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)));
            featureGroupFeatureDTOS = this.cachedFeaturegroupController.getFeaturesDTO(featuregroup.getStreamFeatureGroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, user);
        }
        streamFeatureGroupDTO.setFeatures(featureGroupFeatureDTOS);
        streamFeatureGroupDTO.setName(featuregroup.getName());
        streamFeatureGroupDTO.setDescription(featuregroup.getStreamFeatureGroup().getHiveTbls().getHiveTableParamsCollection().stream().filter(p -> p.getHiveTableParamsPK().getParamKey().equalsIgnoreCase("COMMENT")).map(HiveTableParams::getParamValue).findFirst().orElse(""));
        streamFeatureGroupDTO.setOnlineEnabled(featuregroup.isOnlineEnabled());
        streamFeatureGroupDTO.setLocation(this.featurestoreUtils.resolveLocationURI(featuregroup.getStreamFeatureGroup().getHiveTbls().getSdId().getLocation()));
        return streamFeatureGroupDTO;
    }

    private StreamFeatureGroup persistStreamFeatureGroupMetadata(HiveTbls hiveTable, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS, Boolean onlineEnabled) {
        StreamFeatureGroup streamFeatureGroup = new StreamFeatureGroup();
        streamFeatureGroup.setHiveTbls(hiveTable);
        streamFeatureGroup.setCachedFeatures((Collection)featureGroupFeatureDTOS.stream().filter(feature -> feature.getDescription() != null).map(feature -> new CachedFeature(streamFeatureGroup, feature.getName(), feature.getDescription())).collect(Collectors.toList()));
        streamFeatureGroup.setFeaturesExtraConstraints(this.cachedFeaturegroupController.buildFeatureExtraConstrains(featureGroupFeatureDTOS, null, streamFeatureGroup));
        this.streamFeatureGroupFacade.persist(streamFeatureGroup);
        return streamFeatureGroup;
    }

    public StreamFeatureGroup createStreamFeatureGroup(Featurestore featurestore, StreamFeatureGroupDTO streamFeatureGroupDTO, Project project, Users user) throws FeaturestoreException {
        this.cachedFeaturegroupController.verifyPrimaryKey(streamFeatureGroupDTO, TimeTravelFormat.HUDI);
        String tableName = this.featuregroupController.getTblName(streamFeatureGroupDTO.getName(), streamFeatureGroupDTO.getVersion());
        this.offlineFeatureGroupController.createHiveTable(featurestore, tableName, streamFeatureGroupDTO.getDescription(), this.cachedFeaturegroupController.addHudiSpecFeatures(streamFeatureGroupDTO.getFeatures()), project, user, OfflineFeatureGroupController.Formats.HUDI);
        HiveTbls hiveTbls = this.streamFeatureGroupFacade.getHiveTableByNameAndDB(tableName, featurestore.getHiveDbId()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.WARNING, "Table created correctly but not in the metastore"));
        return this.persistStreamFeatureGroupMetadata(hiveTbls, streamFeatureGroupDTO.getFeatures(), streamFeatureGroupDTO.getOnlineEnabled());
    }

    public void updateMetadata(Project project, Users user, Featuregroup featuregroup, FeaturegroupDTO featuregroupDTO) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        List<FeatureGroupFeatureDTO> previousSchema = this.cachedFeaturegroupController.getFeaturesDTO(featuregroup.getStreamFeatureGroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, user);
        String tableName = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        ArrayList<FeatureGroupFeatureDTO> newFeatures = new ArrayList();
        if (featuregroupDTO.getFeatures() != null) {
            this.cachedFeaturegroupController.verifyPreviousSchemaUnchanged(previousSchema, featuregroupDTO.getFeatures());
            newFeatures = this.featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures());
        }
        if (featuregroupDTO.getDescription() != null) {
            this.offlineFeatureGroupController.alterHiveTableDescription(featuregroup.getFeaturestore(), tableName, featuregroupDTO.getDescription(), project, user);
        }
        this.updateCachedDescriptions(featuregroup.getStreamFeatureGroup(), featuregroupDTO.getFeatures());
        if (!newFeatures.isEmpty()) {
            this.offlineFeatureGroupController.alterHiveTableFeatures(featuregroup.getFeaturestore(), tableName, newFeatures, project, user);
            if (featuregroup.isOnlineEnabled()) {
                this.onlineFeaturegroupController.alterOnlineFeatureGroupSchema(featuregroup, newFeatures, featuregroupDTO.getFeatures(), project, user);
            } else {
                this.alterOfflineStreamFeatureGroupSchema(featuregroup, featuregroupDTO.getFeatures(), project);
            }
            String newFeaturesStr = "New features: " + newFeatures.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.joining(","));
            this.fsActivityFacade.logMetadataActivity(user, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, newFeaturesStr);
        }
    }

    private void updateCachedDescriptions(StreamFeatureGroup streamFeatureGroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOs) {
        for (FeatureGroupFeatureDTO feature : featureGroupFeatureDTOs) {
            Optional<CachedFeature> previousCachedFeature = this.cachedFeaturegroupController.getCachedFeature(streamFeatureGroup.getCachedFeatures(), feature.getName());
            if (feature.getDescription() == null) continue;
            if (previousCachedFeature.isPresent()) {
                previousCachedFeature.get().setDescription(feature.getDescription());
                continue;
            }
            streamFeatureGroup.getCachedFeatures().add(new CachedFeature(streamFeatureGroup, feature.getName(), feature.getDescription()));
        }
        this.streamFeatureGroupFacade.updateMetadata(streamFeatureGroup);
    }

    public void deleteOfflineStreamFeatureGroupTopic(Project project, Featuregroup featureGroup) throws SchemaException, KafkaException {
        String topicName = this.offlineStreamFeatureGroupTopicName(project.getId(), featureGroup.getId(), Utils.getFeaturegroupName(featureGroup));
        this.onlineFeaturegroupController.deleteFeatureGroupKafkaTopic(project, topicName);
    }

    private void alterOfflineStreamFeatureGroupSchema(Featuregroup featureGroup, List<FeatureGroupFeatureDTO> fullNewSchema, Project project) throws SchemaException, KafkaException, FeaturestoreException {
        String topicName = this.offlineStreamFeatureGroupTopicName(project.getId(), featureGroup.getId(), Utils.getFeaturegroupName(featureGroup));
        this.onlineFeaturegroupController.alterFeatureGroupSchema(featureGroup, fullNewSchema, topicName, project);
    }

    public void setupOfflineStreamFeatureGroup(Project project, Featuregroup featureGroup, List<FeatureGroupFeatureDTO> features) throws ProjectException, SchemaException, KafkaException, UserException, FeaturestoreException {
        String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
        String topicName = this.offlineStreamFeatureGroupTopicName(project.getId(), featureGroup.getId(), featureGroupEntityName);
        this.onlineFeaturegroupController.createFeatureGroupKafkaTopic(project, featureGroupEntityName, topicName, features);
    }

    public String offlineStreamFeatureGroupTopicName(Integer projectId, Integer featureGroupId, String featureGroupEntityName) {
        return projectId.toString() + "_" + featureGroupId.toString() + "_" + featureGroupEntityName;
    }
}

