/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.cached;

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.OfflineFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.query.ConstructorController;
import io.hops.hopsworks.common.featurestore.query.Feature;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeatureExtraConstraints;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.CachedFeaturegroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HiveColumns;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HivePartitionKeys;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HiveTableParams;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.HiveTbls;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.HiveSqlDialect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
import org.javatuples.Pair;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class CachedFeaturegroupController {
    @EJB
    private CachedFeaturegroupFacade cachedFeaturegroupFacade;
    @EJB
    private CertificateMaterializer certificateMaterializer;
    @EJB
    private Settings settings;
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private OnlineFeaturegroupController onlineFeaturegroupController;
    @EJB
    private OnlineFeaturestoreController onlineFeaturestoreController;
    @EJB
    private OfflineFeatureGroupController offlineFeatureGroupController;
    @EJB
    private HiveController hiveController;
    @EJB
    private ConstructorController constructorController;
    @EJB
    private FeaturestoreUtils featurestoreUtils;
    private static final Logger LOGGER = Logger.getLogger(CachedFeaturegroupController.class.getName());
    private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
    private static final List<String> HUDI_SPEC_FEATURE_NAMES = Arrays.asList("_hoodie_record_key", "_hoodie_partition_path", "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno");

    @PostConstruct
    public void init() {
        try {
            Class.forName(HIVE_DRIVER);
        }
        catch (ClassNotFoundException e) {
            LOGGER.log(Level.SEVERE, "Could not load the Hive driver: org.apache.hive.jdbc.HiveDriver", e);
        }
    }

    private Connection initConnection(String databaseName, Project project, Users user) throws FeaturestoreException {
        try {
            String hiveEndpoint = this.hiveController.getHiveServerInternalEndpoint();
            this.certificateMaterializer.materializeCertificatesLocal(user.getUsername(), project.getName());
            String password = String.copyValueOf(this.certificateMaterializer.getUserMaterial(user.getUsername(), project.getName()).getPassword());
            String jdbcString = "jdbc:hive2://" + hiveEndpoint + "/" + databaseName + ";auth=noSasl;ssl=true;twoWay=true;sslTrustStore=" + this.certificateMaterializer.getUserTransientTruststorePath(project, user) + ";trustStorePassword=" + password + ";sslKeyStore=" + this.certificateMaterializer.getUserTransientKeystorePath(project, user) + ";keyStorePassword=" + password;
            return DriverManager.getConnection(jdbcString);
        }
        catch (ServiceDiscoveryException | CryptoPasswordNotFoundException | FileNotFoundException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CERTIFICATES_NOT_FOUND, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), e);
        }
        catch (IOException | SQLException e) {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_HIVE_CONNECTION, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), (Throwable)e);
        }
    }

    public String getDDLSchema(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, HopsSecurityException {
        try {
            return this.parseSqlSchemaResult(this.getSQLSchemaForFeaturegroup(featuregroup, project, user));
        }
        catch (SQLException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_FETCH_FEATUREGROUP_SHOW_CREATE_SCHEMA, Level.SEVERE, "Internal error fetching the schema of the feature group", e.getMessage(), (Throwable)e);
        }
    }

    private String parseSqlSchemaResult(FeaturegroupPreview preview) {
        return StringUtils.join((Iterable)preview.getPreview().stream().map(row -> (String)row.getValues().get(0).getValue1()).collect(Collectors.toList()), (String)"\n");
    }

    private String getTblName(String featuregroupName, Integer version) {
        return featuregroupName + "_" + version.toString();
    }

    public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, String partition, boolean online, int limit) throws SQLException, FeaturestoreException, HopsSecurityException {
        if (online && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
            return this.onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, user, limit);
        }
        if (online) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE);
        }
        return this.getOfflineFeaturegroupPreview(featuregroup, project, user, partition, limit);
    }

    public FeaturegroupPreview getOfflineFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, String partition, int limit) throws FeaturestoreException, HopsSecurityException, SQLException {
        String tbl = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        List<FeatureGroupFeatureDTO> features = this.getFeaturesDTO(featuregroup, project, user);
        SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
        for (FeatureGroupFeatureDTO feature : features) {
            if (feature.getDefaultValue() == null) {
                selectList.add((SqlNode)new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"), SqlParserPos.ZERO));
                continue;
            }
            selectList.add(this.constructorController.selectWithDefaultAs(new Feature(feature, tbl)));
        }
        SqlNode whereClause = this.getWhereCondition(partition, features);
        SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList, (SqlNode)new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO), whereClause, null, null, null, null, null, (SqlNode)SqlLiteral.createExactNumeric((String)String.valueOf(limit), (SqlParserPos)SqlParserPos.ZERO));
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        try {
            return this.executeReadHiveQuery(select.toSqlString((SqlDialect)new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
        }
        catch (Exception e) {
            return this.executeReadHiveQuery(select.toSqlString((SqlDialect)new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
        }
    }

    public SqlNode getWhereCondition(String partition, List<FeatureGroupFeatureDTO> features) throws FeaturestoreException {
        String[] splits;
        if (Strings.isNullOrEmpty((String)partition)) {
            return null;
        }
        SqlNodeList whereClauses = new SqlNodeList(SqlParserPos.ZERO);
        for (String split : splits = partition.split("/")) {
            int posEqual = split.indexOf("=");
            String column = split.substring(0, posEqual);
            FeatureGroupFeatureDTO partitionFeature = features.stream().filter(FeatureGroupFeatureDTO::getPartition).filter(feature -> feature.getName().equals(column)).findFirst().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_NAME, Level.FINE, "The selected partition column: " + column + " was not found among the partition columns of the feature group."));
            Object value = partitionFeature.getType().equalsIgnoreCase("string") ? SqlLiteral.createCharString((String)split.substring(posEqual + 1), (SqlParserPos)SqlParserPos.ZERO) : new SqlIdentifier(split.substring(posEqual + 1), SqlParserPos.ZERO);
            whereClauses.add((SqlNode)SqlStdOperatorTable.EQUALS.createCall(SqlParserPos.ZERO, new SqlNode[]{new SqlIdentifier("`" + column + "`", SqlParserPos.ZERO), value}));
        }
        if (whereClauses.size() == 1) {
            return whereClauses;
        }
        return SqlStdOperatorTable.AND.createCall(whereClauses);
    }

    public CachedFeaturegroup createCachedFeaturegroup(Featurestore featurestore, CachedFeaturegroupDTO cachedFeaturegroupDTO, Project project, Users user) throws FeaturestoreException, SQLException {
        this.verifyPrimaryAndPartitionKey(cachedFeaturegroupDTO.getFeatures(), cachedFeaturegroupDTO.getTimeTravelFormat());
        String tableName = this.getTblName(cachedFeaturegroupDTO.getName(), cachedFeaturegroupDTO.getVersion());
        this.offlineFeatureGroupController.createHiveTable(featurestore, tableName, cachedFeaturegroupDTO.getDescription(), cachedFeaturegroupDTO.getTimeTravelFormat() == TimeTravelFormat.HUDI ? this.addHudiSpecFeatures(cachedFeaturegroupDTO.getFeatures()) : cachedFeaturegroupDTO.getFeatures(), project, user, this.getTableFormat(cachedFeaturegroupDTO.getTimeTravelFormat()));
        boolean onlineEnabled = false;
        if (this.settings.isOnlineFeaturestore().booleanValue() && cachedFeaturegroupDTO.getOnlineEnabled().booleanValue()) {
            this.onlineFeaturegroupController.createMySQLTable(featurestore, tableName, cachedFeaturegroupDTO.getFeatures(), project, user);
            onlineEnabled = true;
        }
        HiveTbls hiveTbls = this.cachedFeaturegroupFacade.getHiveTableByNameAndDB(tableName, featurestore.getHiveDbId()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP, Level.WARNING, "", "Table created correctly but not in the metastore"));
        return this.persistCachedFeaturegroupMetadata(hiveTbls, onlineEnabled, cachedFeaturegroupDTO.getTimeTravelFormat(), cachedFeaturegroupDTO.getFeatures());
    }

    public CachedFeaturegroupDTO convertCachedFeaturegroupToDTO(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, ServiceException {
        CachedFeaturegroupDTO cachedFeaturegroupDTO = new CachedFeaturegroupDTO(featuregroup);
        List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS = this.getFeaturesDTO(featuregroup, project, user);
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
            cachedFeaturegroupDTO.setOnlineEnabled(true);
            List<FeatureGroupFeatureDTO> onlineFeatureGroupFeatureDTOS = this.onlineFeaturegroupController.getFeaturegroupFeatures(featuregroup);
            for (FeatureGroupFeatureDTO featureGroupFeatureDTO : featureGroupFeatureDTOS) {
                for (FeatureGroupFeatureDTO onlineFeatureGroupFeatureDTO : onlineFeatureGroupFeatureDTOS) {
                    if (!featureGroupFeatureDTO.getName().equalsIgnoreCase(onlineFeatureGroupFeatureDTO.getName())) continue;
                    featureGroupFeatureDTO.setOnlineType(onlineFeatureGroupFeatureDTO.getType());
                }
            }
        }
        cachedFeaturegroupDTO.setFeatures(featureGroupFeatureDTOS);
        cachedFeaturegroupDTO.setName(featuregroup.getName());
        cachedFeaturegroupDTO.setTimeTravelFormat(featuregroup.getCachedFeaturegroup().getTimeTravelFormat());
        cachedFeaturegroupDTO.setHudiEnabled(featuregroup.getCachedFeaturegroup().getHiveTbls().getSdId().getInputFormat().equals(OfflineFeatureGroupController.Formats.HUDI.getInputFormat()));
        cachedFeaturegroupDTO.setDescription(featuregroup.getCachedFeaturegroup().getHiveTbls().getHiveTableParamsCollection().stream().filter(p -> p.getHiveTableParamsPK().getParamKey().equalsIgnoreCase("COMMENT")).map(HiveTableParams::getParamValue).findFirst().orElse(""));
        cachedFeaturegroupDTO.setLocation(this.featurestoreUtils.resolveLocationURI(featuregroup.getCachedFeaturegroup().getHiveTbls().getSdId().getLocation()));
        return cachedFeaturegroupDTO;
    }

    public List<FeatureGroupFeatureDTO> getFeaturesDTO(Featuregroup featureGroup, Project project, Users user) throws FeaturestoreException {
        String defaultValue;
        boolean primary;
        Collection featureExtraConstraints = featureGroup.getCachedFeaturegroup().getFeaturesExtraConstraints();
        HiveTbls hiveTable = featureGroup.getCachedFeaturegroup().getHiveTbls();
        List<SQLDefaultConstraint> defaultConstraints = this.offlineFeatureGroupController.getDefaultConstraints(featureGroup.getFeaturestore(), hiveTable.getTblName(), project, user);
        List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS = new ArrayList<FeatureGroupFeatureDTO>();
        for (HiveColumns hc : hiveTable.getSdId().getCdId().getHiveColumnsCollection()) {
            primary = this.getPrimaryFlag(featureExtraConstraints, hc.getHiveColumnsPK().getColumnName());
            defaultValue = this.getDefaultValue(defaultConstraints, hc.getHiveColumnsPK().getColumnName());
            featureGroupFeatureDTOS.add(new FeatureGroupFeatureDTO(hc.getHiveColumnsPK().getColumnName(), hc.getTypeName(), hc.getComment(), primary, defaultValue, featureGroup.getId()));
        }
        for (HivePartitionKeys pk : hiveTable.getHivePartitionKeysCollection()) {
            primary = this.getPrimaryFlag(featureExtraConstraints, pk.getHivePartitionKeysPK().getPkeyName());
            defaultValue = this.getDefaultValue(defaultConstraints, pk.getHivePartitionKeysPK().getPkeyName());
            featureGroupFeatureDTOS.add(new FeatureGroupFeatureDTO(pk.getHivePartitionKeysPK().getPkeyName(), pk.getPkeyType(), pk.getPkeyComment(), primary, true, defaultValue, featureGroup.getId()));
        }
        if (featureGroup.getCachedFeaturegroup().getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            featureGroupFeatureDTOS = this.dropHudiSpecFeatureGroupFeature(featureGroupFeatureDTOS);
        }
        return featureGroupFeatureDTOS;
    }

    private String getDefaultValue(List<SQLDefaultConstraint> defaultConstraints, String columnName) {
        return defaultConstraints.stream().filter(constraint -> constraint.getColumn_name().equals(columnName)).map(SQLDefaultConstraint::getDefault_value).findAny().orElse(null);
    }

    private boolean getPrimaryFlag(Collection<CachedFeatureExtraConstraints> featureExtraConstraints, String columnName) {
        return featureExtraConstraints.stream().anyMatch(pk -> pk.getName().equals(columnName));
    }

    private FeaturegroupPreview getSQLSchemaForFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws SQLException, FeaturestoreException, HopsSecurityException {
        String tbl = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        String query = "SHOW CREATE TABLE " + tbl;
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        return this.executeReadHiveQuery(query, db, project, user);
    }

    public void dropHiveFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, IOException, ServiceException {
        String db = this.featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        String tableName = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        this.offlineFeatureGroupController.dropFeatureGroup(db, tableName, project, user);
    }

    public void dropMySQLFeaturegroup(Featuregroup featuregroup, Project project, Users user) throws SQLException, FeaturestoreException {
        if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
            this.onlineFeaturegroupController.dropMySQLTable(featuregroup, project, user);
        }
    }

    public FeaturegroupPreview parseResultset(ResultSet rs) throws SQLException {
        ResultSetMetaData rsmd = rs.getMetaData();
        FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview();
        while (rs.next()) {
            FeaturegroupPreview.Row row = new FeaturegroupPreview.Row();
            for (int i = 1; i <= rsmd.getColumnCount(); ++i) {
                Object columnValue = rs.getObject(i);
                row.addValue((Pair<String, String>)new Pair((Object)this.parseColumnLabel(rsmd.getColumnLabel(i)), (Object)(columnValue == null ? null : columnValue.toString())));
            }
            featuregroupPreview.addRow(row);
        }
        return featuregroupPreview;
    }

    private String parseColumnLabel(String columnLabel) {
        if (columnLabel.contains(".")) {
            return columnLabel.split("\\.")[1];
        }
        return columnLabel;
    }

    private FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user) throws SQLException, FeaturestoreException, HopsSecurityException {
        Connection conn = null;
        Statement stmt = null;
        try {
            conn = this.initConnection(databaseName, project, user);
            stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(query);
            FeaturegroupPreview featuregroupPreview = this.parseResultset(rs);
            return featuregroupPreview;
        }
        catch (SQLException e) {
            if (e.getMessage().toLowerCase().contains("permission denied")) {
                throw new HopsSecurityException(RESTCodes.SecurityErrorCode.HDFS_ACCESS_CONTROL, Level.FINE, "project: " + project.getName() + ", hive database: " + databaseName + " hive query: " + query, e.getMessage(), (Throwable)e);
            }
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.HIVE_READ_QUERY_ERROR, Level.SEVERE, "project: " + project.getName() + ", hive database: " + databaseName + " hive query: " + query, e.getMessage(), (Throwable)e);
        }
        finally {
            if (stmt != null) {
                stmt.close();
            }
            this.closeConnection(conn, user, project);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(Connection conn, Users user, Project project) {
        try {
            if (conn != null) {
                conn.close();
            }
        }
        catch (SQLException e) {
            LOGGER.log(Level.WARNING, "Error closing Hive JDBC connection: " + e);
        }
        finally {
            this.certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
        }
    }

    public CachedFeaturegroup syncHiveTableWithFeaturestore(Featurestore featurestore, CachedFeaturegroupDTO cachedFeaturegroupDTO) throws FeaturestoreException {
        String tableName = this.getTblName(cachedFeaturegroupDTO.getName(), cachedFeaturegroupDTO.getVersion());
        HiveTbls hiveTbls = this.cachedFeaturegroupFacade.getHiveTableByNameAndDB(tableName, featurestore.getHiveDbId()).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.SYNC_TABLE_NOT_FOUND, Level.FINE, ", tried to sync hive table with name: " + tableName + " with the feature store, but the table was not found in the Hive metastore"));
        return this.persistCachedFeaturegroupMetadata(hiveTbls, false, cachedFeaturegroupDTO.getTimeTravelFormat(), cachedFeaturegroupDTO.getFeatures());
    }

    private CachedFeaturegroup persistCachedFeaturegroupMetadata(HiveTbls hiveTable, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS) {
        CachedFeaturegroup cachedFeaturegroup = new CachedFeaturegroup();
        cachedFeaturegroup.setHiveTbls(hiveTable);
        cachedFeaturegroup.setOnlineEnabled(onlineEnabled);
        cachedFeaturegroup.setTimeTravelFormat(timeTravelFormat);
        cachedFeaturegroup.setFeaturesExtraConstraints(this.buildFeatureExtraConstrains(featureGroupFeatureDTOS, cachedFeaturegroup));
        this.cachedFeaturegroupFacade.persist(cachedFeaturegroup);
        return cachedFeaturegroup;
    }

    public FeaturegroupDTO enableFeaturegroupOnline(Featurestore featurestore, Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, ServiceException {
        if (!this.settings.isOnlineFeaturestore().booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster.");
        }
        if (!this.onlineFeaturestoreController.checkIfDatabaseExists(this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject())).booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an administrator.");
        }
        CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup();
        String tableName = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        List<FeatureGroupFeatureDTO> features = this.getFeaturesDTO(featuregroup, project, user);
        if (cachedFeaturegroup.getTimeTravelFormat() == TimeTravelFormat.HUDI) {
            features = this.dropHudiSpecFeatureGroupFeature(features);
        }
        if (!cachedFeaturegroup.isOnlineEnabled()) {
            this.onlineFeaturegroupController.createMySQLTable(featurestore, tableName, features, project, user);
        }
        cachedFeaturegroup.setOnlineEnabled(true);
        this.cachedFeaturegroupFacade.updateMetadata(cachedFeaturegroup);
        return this.convertCachedFeaturegroupToDTO(featuregroup, project, user);
    }

    public FeaturegroupDTO disableFeaturegroupOnline(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException, SQLException, ServiceException {
        if (!this.settings.isOnlineFeaturestore().booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this Hopsworks cluster.");
        }
        if (!this.onlineFeaturestoreController.checkIfDatabaseExists(this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject())).booleanValue()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATURESTORE_ONLINE_NOT_ENABLED, Level.FINE, "Online Featurestore is not enabled for this project. To enable online feature store, talk to an administrator.");
        }
        CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup();
        if (cachedFeaturegroup.isOnlineEnabled()) {
            this.dropMySQLFeaturegroup(featuregroup, project, user);
            cachedFeaturegroup.setOnlineEnabled(false);
            this.cachedFeaturegroupFacade.persist(cachedFeaturegroup);
        }
        return this.convertCachedFeaturegroupToDTO(featuregroup, project, user);
    }

    public void updateMetadata(Project project, Users user, Featuregroup featuregroup, CachedFeaturegroupDTO cachedFeaturegroupDTO) throws FeaturestoreException, SQLException {
        CachedFeaturegroup cachedFeaturegroup = featuregroup.getCachedFeaturegroup();
        List<FeatureGroupFeatureDTO> previousSchema = this.getFeaturesDTO(featuregroup, project, user);
        String tableName = this.getTblName(featuregroup.getName(), featuregroup.getVersion());
        ArrayList<FeatureGroupFeatureDTO> newFeatures = new ArrayList();
        if (cachedFeaturegroupDTO.getFeatures() != null) {
            this.verifyPreviousSchemaUnchanged(previousSchema, cachedFeaturegroupDTO.getFeatures());
            newFeatures = this.verifyAndGetNewFeatures(previousSchema, cachedFeaturegroupDTO.getFeatures());
        }
        if (!Strings.isNullOrEmpty((String)cachedFeaturegroupDTO.getDescription())) {
            this.offlineFeatureGroupController.alterHiveTableDescription(featuregroup.getFeaturestore(), tableName, cachedFeaturegroupDTO.getDescription(), project, user);
        }
        if (!newFeatures.isEmpty()) {
            this.offlineFeatureGroupController.alterHiveTableFeatures(featuregroup.getFeaturestore(), tableName, newFeatures, project, user);
            if (this.settings.isOnlineFeaturestore().booleanValue() && featuregroup.getCachedFeaturegroup().isOnlineEnabled()) {
                this.onlineFeaturegroupController.alterMySQLTableColumns(featuregroup.getFeaturestore(), tableName, newFeatures, project, user);
            }
        }
    }

    private List<FeatureGroupFeatureDTO> addHudiSpecFeatures(List<FeatureGroupFeatureDTO> features) {
        for (String hudiSpecFeature : HUDI_SPEC_FEATURE_NAMES) {
            if (!features.stream().noneMatch(o -> o.getName().equals(hudiSpecFeature))) continue;
            features.add(new FeatureGroupFeatureDTO(hudiSpecFeature, "string", "hudi spec metadata feature", (Boolean)false, false));
        }
        return features;
    }

    private List<FeatureGroupFeatureDTO> dropHudiSpecFeatureGroupFeature(List<FeatureGroupFeatureDTO> features) {
        return features.stream().filter(feature -> !HUDI_SPEC_FEATURE_NAMES.contains(feature.getName())).collect(Collectors.toList());
    }

    public List<Feature> dropHudiSpecFeatures(List<Feature> features) {
        return features.stream().filter(feature -> !HUDI_SPEC_FEATURE_NAMES.contains(feature.getName())).collect(Collectors.toList());
    }

    private OfflineFeatureGroupController.Formats getTableFormat(TimeTravelFormat timeTravelFormat) {
        switch (timeTravelFormat) {
            case HUDI: {
                return OfflineFeatureGroupController.Formats.HUDI;
            }
        }
        return OfflineFeatureGroupController.Formats.valueOf(this.settings.getFeaturestoreDbDefaultStorageFormat());
    }

    public void verifyPreviousSchemaUnchanged(List<FeatureGroupFeatureDTO> previousSchema, List<FeatureGroupFeatureDTO> newSchema) throws FeaturestoreException {
        for (FeatureGroupFeatureDTO feature : previousSchema) {
            FeatureGroupFeatureDTO newFeature = newSchema.stream().filter(newFeat -> feature.getName().equals(newFeat.getName())).findAny().orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Feature " + feature.getName() + " was not found in new schema. It is only possible to append features."));
            if (newFeature.getPartition() == feature.getPartition() && newFeature.getPrimary() == feature.getPrimary() && newFeature.getType().equalsIgnoreCase(feature.getType())) continue;
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Primary key, partition key or type information of feature " + feature.getName() + " changed. Primary key, partition key and type cannot be changed when appending features.");
        }
    }

    public List<FeatureGroupFeatureDTO> verifyAndGetNewFeatures(List<FeatureGroupFeatureDTO> previousSchema, List<FeatureGroupFeatureDTO> newSchema) throws FeaturestoreException {
        ArrayList<FeatureGroupFeatureDTO> newFeatures = new ArrayList<FeatureGroupFeatureDTO>();
        for (FeatureGroupFeatureDTO newFeature : newSchema) {
            boolean isNew = !previousSchema.stream().anyMatch(previousFeature -> previousFeature.getName().equals(newFeature.getName()));
            if (!isNew) continue;
            newFeatures.add(newFeature);
            if (newFeature.getPrimary().booleanValue() || newFeature.getPartition().booleanValue()) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Appended feature `" + newFeature.getName() + "` is specified as primary or partition key. Primary key and partition key cannot be changed when appending features.");
            }
            if (newFeature.getType() != null) continue;
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATUREGROUP_UPDATE, Level.FINE, "Appended feature `" + newFeature.getName() + "` is missing type information. Type information is mandatory when appending features to a feature group.");
        }
        return newFeatures;
    }

    public void verifyPrimaryAndPartitionKey(List<FeatureGroupFeatureDTO> features, TimeTravelFormat timeTravelFormat) throws FeaturestoreException {
        if (timeTravelFormat == TimeTravelFormat.HUDI && (features.stream().noneMatch(FeatureGroupFeatureDTO::getPrimary) || features.stream().noneMatch(FeatureGroupFeatureDTO::getPartition))) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.PRIMARY_KEY_PARTITION_KEY_REQUIRED, Level.FINE);
        }
    }

    private List<CachedFeatureExtraConstraints> buildFeatureExtraConstrains(List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS, CachedFeaturegroup cachedFeaturegroup) {
        ArrayList<CachedFeatureExtraConstraints> cachedFeatureExtraConstraints = new ArrayList<CachedFeatureExtraConstraints>();
        List pkNames = featureGroupFeatureDTOS.stream().filter(FeatureGroupFeatureDTO::getPrimary).map(f -> f.getName()).collect(Collectors.toList());
        for (String pkName : pkNames) {
            cachedFeatureExtraConstraints.add(new CachedFeatureExtraConstraints(cachedFeaturegroup, pkName, Boolean.valueOf(true), Boolean.valueOf(false)));
        }
        return cachedFeatureExtraConstraints;
    }
}

