package io.hops.hopsworks.common.featurestore.featuregroup.stream;

import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.datavalidationv2.suites.ExpectationSuiteController;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.OfflineFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.hive.HiveTbls;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.stream.StreamFeatureGroup;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@TransactionAttribute(TransactionAttributeType.NEVER)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupController.class */
public class StreamFeatureGroupController {

    @EJB
    private StreamFeatureGroupFacade streamFeatureGroupFacade;

    @EJB
    private OnlineFeaturegroupController onlineFeaturegroupController;

    @EJB
    private OfflineFeatureGroupController offlineFeatureGroupController;

    @EJB
    private FeaturegroupController featuregroupController;

    @EJB
    private FeaturestoreUtils featurestoreUtils;

    @EJB
    private CachedFeaturegroupController cachedFeaturegroupController;

    @EJB
    private FeaturestoreActivityFacade fsActivityFacade;

    @EJB
    private ExpectationSuiteController expectationSuiteController;

    public StreamFeatureGroupDTO convertStreamFeatureGroupToDTO(Featuregroup featuregroup, Project project, Users users) throws FeaturestoreException, ServiceException {
        List<FeatureGroupFeatureDTO> featuresDTO;
        if (featuregroup.getExpectationSuite() != null) {
            featuregroup.setExpectationSuite(this.expectationSuiteController.addAllExpectationIdToMetaField(featuregroup.getExpectationSuite()));
        }
        StreamFeatureGroupDTO streamFeatureGroupDTO = new StreamFeatureGroupDTO(featuregroup);
        if (featuregroup.getStreamFeatureGroup().isOnlineEnabled()) {
            streamFeatureGroupDTO.setOnlineTopicName(this.onlineFeaturegroupController.onlineFeatureGroupTopicName(project.getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)));
            featuresDTO = this.onlineFeaturegroupController.getFeaturegroupFeatures(featuregroup, this.cachedFeaturegroupController.getFeaturesDTO(featuregroup.getStreamFeatureGroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, users));
        } else {
            streamFeatureGroupDTO.setOnlineTopicName(offlineStreamFeatureGroupTopicName(project.getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)));
            featuresDTO = this.cachedFeaturegroupController.getFeaturesDTO(featuregroup.getStreamFeatureGroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, users);
        }
        streamFeatureGroupDTO.setFeatures(featuresDTO);
        streamFeatureGroupDTO.setName(featuregroup.getName());
        streamFeatureGroupDTO.setDescription((String) featuregroup.getStreamFeatureGroup().getHiveTbls().getHiveTableParamsCollection().stream().filter(hiveTableParams -> {
            return hiveTableParams.getHiveTableParamsPK().getParamKey().equalsIgnoreCase("COMMENT");
        }).map((v0) -> {
            return v0.getParamValue();
        }).findFirst().orElse(KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM));
        streamFeatureGroupDTO.setOnlineEnabled(Boolean.valueOf(featuregroup.getStreamFeatureGroup().isOnlineEnabled()));
        streamFeatureGroupDTO.setLocation(this.featurestoreUtils.resolveLocationURI(featuregroup.getStreamFeatureGroup().getHiveTbls().getSdId().getLocation()));
        return streamFeatureGroupDTO;
    }

    private StreamFeatureGroup persistStreamFeatureGroupMetadata(HiveTbls hiveTbls, List<FeatureGroupFeatureDTO> list, Boolean bool) {
        StreamFeatureGroup streamFeatureGroup = new StreamFeatureGroup();
        streamFeatureGroup.setHiveTbls(hiveTbls);
        streamFeatureGroup.setCachedFeatures((Collection) list.stream().filter(featureGroupFeatureDTO -> {
            return featureGroupFeatureDTO.getDescription() != null;
        }).map(featureGroupFeatureDTO2 -> {
            return new CachedFeature(streamFeatureGroup, featureGroupFeatureDTO2.getName(), featureGroupFeatureDTO2.getDescription());
        }).collect(Collectors.toList()));
        streamFeatureGroup.setFeaturesExtraConstraints(this.cachedFeaturegroupController.buildFeatureExtraConstrains(list, null, streamFeatureGroup));
        streamFeatureGroup.setOnlineEnabled(bool.booleanValue());
        this.streamFeatureGroupFacade.persist(streamFeatureGroup);
        return streamFeatureGroup;
    }

    public StreamFeatureGroup createStreamFeatureGroup(Featurestore featurestore, StreamFeatureGroupDTO streamFeatureGroupDTO, Project project, Users users) throws FeaturestoreException {
        this.cachedFeaturegroupController.verifyPrimaryKey(streamFeatureGroupDTO, TimeTravelFormat.HUDI);
        String tblName = this.featuregroupController.getTblName(streamFeatureGroupDTO.getName(), streamFeatureGroupDTO.getVersion());
        this.offlineFeatureGroupController.createHiveTable(featurestore, tblName, streamFeatureGroupDTO.getDescription(), this.cachedFeaturegroupController.addHudiSpecFeatures(streamFeatureGroupDTO.getFeatures()), project, users, OfflineFeatureGroupController.Formats.HUDI);
        return persistStreamFeatureGroupMetadata(this.streamFeatureGroupFacade.getHiveTableByNameAndDB(tblName, featurestore.getHiveDbId()).orElseThrow(() -> {
            return new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.WARNING, "Table created correctly but not in the metastore");
        }), streamFeatureGroupDTO.getFeatures(), streamFeatureGroupDTO.getOnlineEnabled());
    }

    public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users users, String str, boolean z, int i) throws SQLException, FeaturestoreException, HopsSecurityException {
        if (z && featuregroup.getStreamFeatureGroup().isOnlineEnabled()) {
            return this.onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, users, i);
        }
        if (z) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE);
        }
        return this.cachedFeaturegroupController.getOfflineFeaturegroupPreview(featuregroup, project, users, str, i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void updateMetadata(Project project, Users users, Featuregroup featuregroup, FeaturegroupDTO featuregroupDTO) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        List<FeatureGroupFeatureDTO> featuresDTO = this.cachedFeaturegroupController.getFeaturesDTO(featuregroup.getStreamFeatureGroup(), featuregroup.getId(), featuregroup.getFeaturestore(), project, users);
        String tblName = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        List arrayList = new ArrayList();
        if (featuregroupDTO.getFeatures() != null) {
            this.cachedFeaturegroupController.verifyPreviousSchemaUnchanged(featuresDTO, featuregroupDTO.getFeatures());
            arrayList = this.cachedFeaturegroupController.verifyAndGetNewFeatures(featuresDTO, featuregroupDTO.getFeatures());
        }
        if (featuregroupDTO.getDescription() != null) {
            this.offlineFeatureGroupController.alterHiveTableDescription(featuregroup.getFeaturestore(), tblName, featuregroupDTO.getDescription(), project, users);
        }
        updateCachedDescriptions(featuregroup.getStreamFeatureGroup(), featuregroupDTO.getFeatures());
        if (arrayList.isEmpty()) {
            return;
        }
        this.offlineFeatureGroupController.alterHiveTableFeatures(featuregroup.getFeaturestore(), tblName, arrayList, project, users);
        if (featuregroup.getStreamFeatureGroup().isOnlineEnabled()) {
            this.onlineFeaturegroupController.alterOnlineFeatureGroupSchema(featuregroup, arrayList, featuregroupDTO.getFeatures(), project, users);
        } else {
            alterOfflineStreamFeatureGroupSchema(featuregroup, featuregroupDTO.getFeatures(), project);
        }
        this.fsActivityFacade.logMetadataActivity(users, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, "New features: " + ((String) arrayList.stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.joining(","))));
    }

    private void updateCachedDescriptions(StreamFeatureGroup streamFeatureGroup, List<FeatureGroupFeatureDTO> list) {
        for (FeatureGroupFeatureDTO featureGroupFeatureDTO : list) {
            Optional<CachedFeature> cachedFeature = this.cachedFeaturegroupController.getCachedFeature(streamFeatureGroup.getCachedFeatures(), featureGroupFeatureDTO.getName());
            if (featureGroupFeatureDTO.getDescription() != null) {
                if (cachedFeature.isPresent()) {
                    cachedFeature.get().setDescription(featureGroupFeatureDTO.getDescription());
                } else {
                    streamFeatureGroup.getCachedFeatures().add(new CachedFeature(streamFeatureGroup, featureGroupFeatureDTO.getName(), featureGroupFeatureDTO.getDescription()));
                }
            }
        }
        this.streamFeatureGroupFacade.updateMetadata(streamFeatureGroup);
    }

    public void deleteOfflineStreamFeatureGroupTopic(Project project, Featuregroup featuregroup) throws SchemaException, KafkaException {
        this.onlineFeaturegroupController.deleteFeatureGroupKafkaTopic(project, offlineStreamFeatureGroupTopicName(project.getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)));
    }

    private void alterOfflineStreamFeatureGroupSchema(Featuregroup featuregroup, List<FeatureGroupFeatureDTO> list, Project project) throws SchemaException, KafkaException, FeaturestoreException {
        this.onlineFeaturegroupController.alterFeatureGroupSchema(featuregroup, list, offlineStreamFeatureGroupTopicName(project.getId(), featuregroup.getId(), Utils.getFeaturegroupName(featuregroup)), project);
    }

    public void setupOfflineStreamFeatureGroup(Project project, Featuregroup featuregroup, List<FeatureGroupFeatureDTO> list) throws ProjectException, SchemaException, KafkaException, UserException, FeaturestoreException {
        String featuregroupName = Utils.getFeaturegroupName(featuregroup);
        this.onlineFeaturegroupController.createFeatureGroupKafkaTopic(project, featuregroupName, offlineStreamFeatureGroupTopicName(project.getId(), featuregroup.getId(), featuregroupName), list);
    }

    public String offlineStreamFeatureGroupTopicName(Integer num, Integer num2, String str) {
        return num.toString() + "_" + num2.toString() + "_" + str;
    }
}
