/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.cached;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeatureGroupInputValidation;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.OfflineFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
import io.hops.hopsworks.common.featurestore.query.ConstructorController;
import io.hops.hopsworks.common.featurestore.query.Feature;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.activity.FeaturestoreActivityMeta;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeatureExtraConstraints;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeaturegroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.stream.StreamFeatureGroup;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.HiveSqlDialect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class CachedFeaturegroupController {
    @EJB
    private CachedFeaturegroupFacade cachedFeatureGroupFacade;
    @EJB
    private FeaturegroupFacade featureGroupFacade;
    @EJB
    private CertificateMaterializer certificateMaterializer;
    @EJB
    private Settings settings;
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private OnlineFeaturegroupController onlineFeaturegroupController;
    @EJB
    private OfflineFeatureGroupController offlineFeatureGroupController;
    @EJB
    private HiveController hiveController;
    @EJB
    private ConstructorController constructorController;
    @EJB
    private FeaturestoreUtils featurestoreUtils;
    @EJB
    private FeaturestoreActivityFacade fsActivityFacade;
    @EJB
    private FeaturegroupController featuregroupController;
    @EJB
    private FeatureGroupInputValidation featureGroupInputValidation;
    private static final Logger LOGGER = Logger.getLogger(CachedFeaturegroupController.class.getName());
    private static final List<String> HUDI_SPEC_FEATURE_NAMES = Arrays.asList("_hoodie_record_key", "_hoodie_partition_path", "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno");

    @PostConstruct
    public void init() {
        try {
            Class.forName("io.hops.hive.jdbc.HiveDriver");
        }
        catch (ClassNotFoundException e) {
            LOGGER.log(Level.SEVERE, "Could not load the Hive driver: io.hops.hive.jdbc.HiveDriver", e);
        }
    }

    private Connection initConnection(String databaseName, Project project, Users user) throws FeaturestoreException {
        try {
            String hiveEndpoint = this.hiveController.getHiveServerInternalEndpoint();
            this.certificateMaterializer.materializeCertificatesLocal(user.getUsername(), project.getName());
            String password = String.copyValueOf(this.certificateMaterializer.getUserMaterial(user.getUsername(), project.getName()).getPassword());
            String jdbcString = "jdbc:hopshive://" + hiveEndpoint + "/" + databaseName + ";auth=noSasl;ssl=true;twoWay=true;sslTrustStore=" + this.certificateMaterializer.getUserTransientTruststorePath(project, user) + ";trustStorePassword=" + password + ";sslKeyStore=" + this.certificateMaterializer.getUserTransientKeystorePath(project, user) + ";keyStorePassword=" + password;
            return DriverManager.getConnection(jdbcString);
        }
        catch (ServiceDiscoveryException | CryptoPasswordNotFoundException | FileNotFoundException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CERTIFICATES_NOT_FOUND, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), e);
        }
        catch (IOException | SQLException e) {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_HIVE_CONNECTION, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), (Throwable)e);
        }
    }

    public FeaturegroupPreview getOfflineFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, String partition, int limit) throws FeaturestoreException, HopsSecurityException, SQLException {
        String tbl = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        List<FeatureGroupFeatureDTO> features = this.featuregroupController.getFeatures(featuregroup, project, user);
        SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
        for (FeatureGroupFeatureDTO feature : features) {
            if (feature.getDefaultValue() == null) {
                selectList.add((SqlNode)new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"), SqlParserPos.ZERO));
                continue;
            }
            selectList.add(this.constructorController.selectWithDefaultAs(new Feature(feature, tbl), false));
        }
        SqlNode whereClause = this.getWhereCondition(partition, features);
        SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList, (SqlNode)new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO), whereClause, null, null, null, null, null, (SqlNode)SqlLiteral.createExactNumeric((String)String.valueOf(limit), (SqlParserPos)SqlParserPos.ZERO), null);
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        try {
            return this.executeReadHiveQuery(select.toSqlString((SqlDialect)new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
        }
        catch (Exception e) {
            return this.executeReadHiveQuery(select.toSqlString((SqlDialect)new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
        }
    }

    public SqlNode getWhereCondition(String partition, List<FeatureGroupFeatureDTO> features) throws FeaturestoreException {
        String[] splits;
        if (Strings.isNullOrEmpty((String)partition)) {
            return null;
        }
        SqlNodeList whereClauses = new SqlNodeList(SqlParserPos.ZERO);
        for (String split : splits = partition.split("/")) {
            int posEqual = split.indexOf("=");
            String column = split.substring(0, posEqual);
            FeatureGroupFeatureDTO partitionFeature = features.stream().filter(FeatureGroupFeatureDTO::getPartition).filter(feature -> feature.getName().equals(column)).findFirst().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_NAME, Level.FINE, "The selected partition column: " + column + " was not found among the partition columns of the feature group."));
            Object value = partitionFeature.getType().equalsIgnoreCase("string") ? SqlLiteral.createCharString((String)split.substring(posEqual + 1), (SqlParserPos)SqlParserPos.ZERO) : new SqlIdentifier(split.substring(posEqual + 1), SqlParserPos.ZERO);
            whereClauses.add((SqlNode)SqlStdOperatorTable.EQUALS.createCall(SqlParserPos.ZERO, new SqlNode[]{new SqlIdentifier("`" + column + "`", SqlParserPos.ZERO), value}));
        }
        if (whereClauses.size() == 1) {
            return whereClauses;
        }
        return SqlStdOperatorTable.AND.createCall(whereClauses);
    }

    public CachedFeaturegroup createCachedFeaturegroup(Featurestore featurestore, CachedFeaturegroupDTO cachedFeaturegroupDTO, Project project, Users user) throws FeaturestoreException {
        this.verifyPrimaryKey(cachedFeaturegroupDTO, cachedFeaturegroupDTO.getTimeTravelFormat());
        String tbl = this.featuregroupController.getTblName(cachedFeaturegroupDTO.getName(), cachedFeaturegroupDTO.getVersion());
        this.offlineFeatureGroupController.createHiveTable(featurestore, tbl, cachedFeaturegroupDTO.getTimeTravelFormat() == TimeTravelFormat.HUDI ? this.addHudiSpecFeatures(cachedFeaturegroupDTO.getFeatures()) : cachedFeaturegroupDTO.getFeatures(), project, user, this.getTableFormat(cachedFeaturegroupDTO.getTimeTravelFormat()));
        return this.persistCachedFeaturegroupMetadata(cachedFeaturegroupDTO.getTimeTravelFormat(), cachedFeaturegroupDTO.getFeatures());
    }

    public CachedFeaturegroupDTO convertCachedFeaturegroupToDTO(Featuregroup featuregroup, Project project, Users user) throws ServiceException {
        CachedFeaturegroupDTO cachedFeaturegroupDTO = new CachedFeaturegroupDTO(featuregroup);
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.isOnlineEnabled()) {
            cachedFeaturegroupDTO.setOnlineEnabled(true);
            cachedFeaturegroupDTO.setOnlineTopicName(Utils.getFeatureGroupTopicName(featuregroup));
        }
        cachedFeaturegroupDTO.setName(featuregroup.getName());
        cachedFeaturegroupDTO.setTimeTravelFormat(featuregroup.getCachedFeaturegroup().getTimeTravelFormat());
        cachedFeaturegroupDTO.setDescription(featuregroup.getDescription());
        cachedFeaturegroupDTO.setLocation(this.featurestoreUtils.resolveLocation(this.featuregroupController.getFeatureGroupLocation(featuregroup)));
        return cachedFeaturegroupDTO;
    }

    public List<FeatureGroupFeatureDTO> getFeaturesDTO(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        Set primaryKeys = featuregroup.getCachedFeaturegroup().getFeaturesExtraConstraints().stream().filter(CachedFeatureExtraConstraints::getPrimary).map(CachedFeatureExtraConstraints::getName).collect(Collectors.toSet());
        Set precombineKeys = featuregroup.getCachedFeaturegroup().getFeaturesExtraConstraints().stream().filter(CachedFeatureExtraConstraints::getHudiPrecombineKey).map(CachedFeatureExtraConstraints::getName).collect(Collectors.toSet());
        Map<String, String> featureDescription = featuregroup.getCachedFeaturegroup().getCachedFeatures().stream().collect(Collectors.toMap(CachedFeature::getName, CachedFeature::getDescription));
        List<FeatureGroupFeatureDTO> featureGroupFeatures = this.offlineFeatureGroupController.getSchema(featuregroup.getFeaturestore(), this.featuregroupController.getTblName(featuregroup), project, user);
        for (FeatureGroupFeatureDTO feature : featureGroupFeatures) {
            feature.setPrimary(primaryKeys.contains(feature.getName()));
            feature.setHudiPrecombineKey(precombineKeys.contains(feature.getName()));
            feature.setDescription(featureDescription.get(feature.getName()));
            feature.setFeatureGroupId(featuregroup.getId());
        }
        if (featuregroup.getCachedFeaturegroup().getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            featureGroupFeatures = this.dropHudiSpecFeatureGroupFeature(featureGroupFeatures);
        }
        return featureGroupFeatures;
    }

    public List<FeatureGroupFeatureDTO> getFeaturesDTOOnlineChecked(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS = this.getFeaturesDTO(featuregroup, project, user);
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.isOnlineEnabled()) {
            featureGroupFeatureDTOS = this.onlineFeaturegroupController.getFeaturegroupFeatures(featuregroup, featureGroupFeatureDTOS);
        }
        return featureGroupFeatureDTOS;
    }

    public void deleteFeatureGroup(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, IOException, ServiceException {
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        String tableName = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        this.offlineFeatureGroupController.dropFeatureGroup(db, tableName, project, user);
        this.cachedFeatureGroupFacade.remove(featuregroup.getCachedFeaturegroup());
    }

    private FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user) throws SQLException, FeaturestoreException, HopsSecurityException {
        Connection conn = null;
        Statement stmt = null;
        try {
            conn = this.initConnection(databaseName, project, user);
            stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(query);
            FeaturegroupPreview featuregroupPreview = this.featurestoreUtils.parseResultset(rs);
            return featuregroupPreview;
        }
        catch (SQLException e) {
            if (e.getMessage().toLowerCase().contains("permission denied")) {
                throw new HopsSecurityException(RESTCodes.SecurityErrorCode.HDFS_ACCESS_CONTROL, Level.FINE, "project: " + project.getName() + ", hive database: " + databaseName + " hive query: " + query, e.getMessage(), (Throwable)e);
            }
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.HIVE_READ_QUERY_ERROR, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName + " hive query: " + query, e.getMessage(), (Throwable)e);
        }
        finally {
            if (stmt != null) {
                stmt.close();
            }
            this.closeConnection(conn, user, project);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(Connection conn, Users user, Project project) {
        try {
            if (conn != null) {
                conn.close();
            }
        }
        catch (SQLException e) {
            LOGGER.log(Level.WARNING, "Error closing Hive JDBC connection: " + e);
        }
        finally {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
        }
    }

    private CachedFeaturegroup persistCachedFeaturegroupMetadata(TimeTravelFormat timeTravelFormat, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS) {
        CachedFeaturegroup cachedFeaturegroup = new CachedFeaturegroup();
        cachedFeaturegroup.setTimeTravelFormat(timeTravelFormat);
        cachedFeaturegroup.setFeaturesExtraConstraints(this.buildFeatureExtraConstrains(featureGroupFeatureDTOS, cachedFeaturegroup, null));
        cachedFeaturegroup.setCachedFeatures((Collection)featureGroupFeatureDTOS.stream().filter(feature -> feature.getDescription() != null).map(feature -> new CachedFeature(cachedFeaturegroup, feature.getName(), feature.getDescription())).collect(Collectors.toList()));
        this.cachedFeatureGroupFacade.persist(cachedFeaturegroup);
        return cachedFeaturegroup;
    }

    public void enableFeaturegroupOnline(Featurestore featurestore, Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, ServiceException, KafkaException, SchemaException, ProjectException, UserException, IOException, HopsSecurityException {
        CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup();
        List<FeatureGroupFeatureDTO> features = this.getFeaturesDTO(featuregroup, project, user);
        if (cachedFeaturegroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            features = this.dropHudiSpecFeatureGroupFeature(features);
        }
        if (!featuregroup.isOnlineEnabled()) {
            featuregroup.setOnlineEnabled(true);
            this.onlineFeaturegroupController.setupOnlineFeatureGroup(featurestore, featuregroup, features, project, user);
        }
        this.featureGroupFacade.updateFeaturegroupMetadata(featuregroup);
    }

    public void disableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.isOnlineEnabled()) {
            this.onlineFeaturegroupController.disableOnlineFeatureGroup(featuregroup, project, user);
            featuregroup.setOnlineEnabled(false);
            this.featureGroupFacade.updateFeaturegroupMetadata(featuregroup);
        }
    }

    public void updateMetadata(Project project, Users user, Featuregroup featuregroup, FeaturegroupDTO featuregroupDTO) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        List<FeatureGroupFeatureDTO> previousSchema = this.getFeaturesDTO(featuregroup, project, user);
        String tableName = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        ArrayList<FeatureGroupFeatureDTO> newFeatures = new ArrayList();
        if (featuregroupDTO.getFeatures() != null) {
            this.verifyPreviousSchemaUnchanged(previousSchema, featuregroupDTO.getFeatures());
            newFeatures = this.featureGroupInputValidation.verifyAndGetNewFeatures(previousSchema, featuregroupDTO.getFeatures());
        }
        this.updateCachedDescriptions(featuregroup.getCachedFeaturegroup(), featuregroupDTO.getFeatures());
        if (!newFeatures.isEmpty()) {
            this.offlineFeatureGroupController.alterHiveTableFeatures(featuregroup.getFeaturestore(), tableName, newFeatures, project, user);
            if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.isOnlineEnabled()) {
                this.onlineFeaturegroupController.alterOnlineFeatureGroupSchema(featuregroup, newFeatures, featuregroupDTO.getFeatures(), project, user);
            }
            String newFeaturesStr = "New features: " + newFeatures.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.joining(","));
            this.fsActivityFacade.logMetadataActivity(user, featuregroup, FeaturestoreActivityMeta.FG_ALTERED, newFeaturesStr);
        }
    }

    private void updateCachedDescriptions(CachedFeaturegroup cachedFeatureGroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOs) {
        for (FeatureGroupFeatureDTO feature : featureGroupFeatureDTOs) {
            Optional<CachedFeature> previousCachedFeature = this.getCachedFeature(cachedFeatureGroup.getCachedFeatures(), feature.getName());
            if (feature.getDescription() == null) continue;
            if (previousCachedFeature.isPresent()) {
                previousCachedFeature.get().setDescription(feature.getDescription());
                continue;
            }
            cachedFeatureGroup.getCachedFeatures().add(new CachedFeature(cachedFeatureGroup, feature.getName(), feature.getDescription()));
        }
        this.cachedFeatureGroupFacade.updateMetadata(cachedFeatureGroup);
    }

    public Optional<CachedFeature> getCachedFeature(Collection<CachedFeature> cachedFeatures, String featureName) {
        return cachedFeatures.stream().filter(feature -> feature.getName().equalsIgnoreCase(featureName)).findAny();
    }

    public List<FeatureGroupFeatureDTO> addHudiSpecFeatures(List<FeatureGroupFeatureDTO> features) {
        for (String hudiSpecFeature : HUDI_SPEC_FEATURE_NAMES) {
            if (!features.stream().noneMatch(o -> o.getName().equals(hudiSpecFeature))) continue;
            features.add(new FeatureGroupFeatureDTO(hudiSpecFeature, "string", "hudi spec metadata feature", (Boolean)false, (Boolean)false));
        }
        return features;
    }

    public List<FeatureGroupFeatureDTO> dropHudiSpecFeatureGroupFeature(List<FeatureGroupFeatureDTO> features) {
        return features.stream().filter(feature -> !HUDI_SPEC_FEATURE_NAMES.contains(feature.getName())).collect(Collectors.toList());
    }

    public List<Feature> dropHudiSpecFeatures(List<Feature> features) {
        return features.stream().filter(feature -> !HUDI_SPEC_FEATURE_NAMES.contains(feature.getName())).collect(Collectors.toList());
    }

    private OfflineFeatureGroupController.Formats getTableFormat(TimeTravelFormat timeTravelFormat) {
        switch (timeTravelFormat) {
            case HUDI: {
                return OfflineFeatureGroupController.Formats.HUDI;
            }
        }
        return OfflineFeatureGroupController.Formats.valueOf(this.settings.getFeaturestoreDbDefaultStorageFormat());
    }

    public void verifyPreviousSchemaUnchanged(List<FeatureGroupFeatureDTO> previousSchema, List<FeatureGroupFeatureDTO> newSchema) throws FeaturestoreException {
        for (FeatureGroupFeatureDTO feature : previousSchema) {
            FeatureGroupFeatureDTO newFeature = newSchema.stream().filter(newFeat -> feature.getName().equals(newFeat.getName())).findAny().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Feature " + feature.getName() + " was not found in new schema. It is only possible to append features."));
            if (newFeature.getPartition() == feature.getPartition() && newFeature.getPrimary() == feature.getPrimary() && newFeature.getType().equalsIgnoreCase(feature.getType())) continue;
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Primary key, partition key or type information of feature " + feature.getName() + " changed. Primary key, partition key and type cannot be changed when appending features.");
        }
    }

    public void verifyPrimaryKey(FeaturegroupDTO featuregroupDTO, TimeTravelFormat timeTravelFormat) throws FeaturestoreException {
        if (timeTravelFormat == TimeTravelFormat.HUDI && featuregroupDTO.getFeatures().stream().noneMatch(FeatureGroupFeatureDTO::getPrimary)) {
            if (featuregroupDTO instanceof StreamFeatureGroupDTO) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.PRIMARY_KEY_REQUIRED, Level.FINE, "Stream enabled feature groups only support `HUDI` time travel format, which requires a primary key to be set.");
            }
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.PRIMARY_KEY_REQUIRED, Level.FINE, "Time travel format `HUDI` requires a primary key to be set.");
        }
    }

    public List<CachedFeatureExtraConstraints> buildFeatureExtraConstrains(List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS, CachedFeaturegroup cachedFeaturegroup, StreamFeatureGroup streamFeatureGroup) {
        ArrayList<CachedFeatureExtraConstraints> cachedFeatureExtraConstraints = new ArrayList<CachedFeatureExtraConstraints>();
        List pkNames = featureGroupFeatureDTOS.stream().filter(FeatureGroupFeatureDTO::getPrimary).map(FeatureGroupFeatureDTO::getName).collect(Collectors.toList());
        String hudiPrecombineKeyName = featureGroupFeatureDTOS.stream().filter(FeatureGroupFeatureDTO::getHudiPrecombineKey).map(FeatureGroupFeatureDTO::getName).findFirst().orElse(null);
        boolean primaryKeyIsHudiPrecombineKey = false;
        if (streamFeatureGroup != null || cachedFeaturegroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            if (hudiPrecombineKeyName == null) {
                hudiPrecombineKeyName = (String)pkNames.get(0);
                primaryKeyIsHudiPrecombineKey = true;
            } else {
                primaryKeyIsHudiPrecombineKey = pkNames.contains(hudiPrecombineKeyName);
            }
        }
        for (String pkName : pkNames) {
            cachedFeatureExtraConstraints.add(streamFeatureGroup == null ? new CachedFeatureExtraConstraints(cachedFeaturegroup, pkName, Boolean.valueOf(true), Boolean.valueOf(pkName.equals(hudiPrecombineKeyName))) : new CachedFeatureExtraConstraints(streamFeatureGroup, pkName, Boolean.valueOf(true), Boolean.valueOf(pkName.equals(hudiPrecombineKeyName))));
        }
        if (!(primaryKeyIsHudiPrecombineKey || cachedFeaturegroup.getTimeTravelFormat() != TimeTravelFormat.HUDI && streamFeatureGroup == null)) {
            cachedFeatureExtraConstraints.add(streamFeatureGroup == null ? new CachedFeatureExtraConstraints(cachedFeaturegroup, hudiPrecombineKeyName, Boolean.valueOf(false), Boolean.valueOf(true)) : new CachedFeatureExtraConstraints(streamFeatureGroup, hudiPrecombineKeyName, Boolean.valueOf(false), Boolean.valueOf(true)));
        }
        return cachedFeatureExtraConstraints;
    }
}

