/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.online;

import com.google.common.base.Strings;
import com.logicalclocks.shaded.org.apache.commons.lang3.StringUtils;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.online.AvroSchemaConstructorController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade;
import io.hops.hopsworks.common.featurestore.query.ConstructorController;
import io.hops.hopsworks.common.featurestore.query.Feature;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.kafka.KafkaController;
import io.hops.hopsworks.common.kafka.SchemasController;
import io.hops.hopsworks.common.kafka.SubjectsCompatibilityController;
import io.hops.hopsworks.common.kafka.SubjectsController;
import io.hops.hopsworks.common.project.ProjectController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.kafka.schemas.SchemaCompatibility;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import org.apache.calcite.sql.parser.SqlParserPos;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class OnlineFeaturegroupController {
    @EJB
    private OnlineFeaturestoreController onlineFeaturestoreController;
    @EJB
    private OnlineFeaturestoreFacade onlineFeaturestoreFacade;
    @EJB
    private Settings settings;
    @EJB
    private SubjectsController subjectsController;
    @EJB
    private KafkaController kafkaController;
    @EJB
    private AvroSchemaConstructorController avroSchemaConstructorController;
    @EJB
    private SchemasController schemasController;
    @EJB
    private SubjectsCompatibilityController subjectsCompatibilityController;
    @EJB
    private ProjectController projectController;
    @EJB
    private ConstructorController constructorController;
    @EJB
    private FeaturegroupController featuregroupController;
    private static final List<String> SUPPORTED_MYSQL_TYPES = Arrays.asList("INT", "TINYINT", "SMALLINT", "BIGINT", "FLOAT", "DOUBLE", "DECIMAL", "DATE", "TIMESTAMP");
    private static final String VARBINARY_DEFAULT = "VARBINARY(100)";
    private static final String VARCHAR_DEFAULT = "VARCHAR(100)";

    public OnlineFeaturegroupController() {
    }

    protected OnlineFeaturegroupController(Settings settings) {
        this.settings = settings;
    }

    public void dropMySQLTable(Featuregroup featuregroup, Project project, Users user) throws FeaturestoreException {
        String query = "DROP TABLE " + featuregroup.getName() + "_" + featuregroup.getVersion() + ";";
        this.onlineFeaturestoreFacade.executeUpdateJDBCQuery(query, this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()), project, user);
    }

    public void createMySQLTable(Featurestore featurestore, String tableName, List<FeatureGroupFeatureDTO> features, Project project, Users user) throws FeaturestoreException {
        String dbName = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject());
        String createStatement = this.buildCreateStatement(dbName, tableName, features);
        this.onlineFeaturestoreFacade.executeUpdateJDBCQuery(createStatement, dbName, project, user);
    }

    public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup featureGroup, List<FeatureGroupFeatureDTO> features, Project project, Users user) throws KafkaException, SchemaException, ProjectException, FeaturestoreException, IOException, HopsSecurityException, ServiceException {
        if (project.getProjectTeamCollection().stream().noneMatch(pt -> pt.getUser().getUsername().equals("onlinefs"))) {
            try {
                this.projectController.addOnlineFsUser(project).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new ServiceException(RESTCodes.ServiceErrorCode.SERVICE_GENERIC_ERROR, Level.SEVERE, "failed to add onlinefs user to project: " + project.getName(), e.getMessage(), (Throwable)e);
            }
        }
        String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
        this.createMySQLTable(featureStore, featureGroupEntityName, features, project, user);
        this.createFeatureGroupKafkaTopic(project, featureGroup, features);
    }

    public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup, List<FeatureGroupFeatureDTO> features) throws KafkaException, SchemaException, FeaturestoreException {
        block2: {
            String avroSchema = this.avroSchemaConstructorController.constructSchema(featureGroup, features);
            this.schemasController.validateSchema(project, avroSchema);
            String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
            this.subjectsController.registerNewSubject(project, featureGroupEntityName, avroSchema, false);
            this.subjectsCompatibilityController.setSubjectCompatibility(project, featureGroupEntityName, SchemaCompatibility.NONE);
            String topicName = Utils.getFeatureGroupTopicName(featureGroup);
            try {
                TopicDTO topicDTO = new TopicDTO(topicName, this.settings.getKafkaDefaultNumReplicas(), this.settings.getOnlineFsThreadNumber());
                this.kafkaController.createTopic(project, topicDTO);
            }
            catch (KafkaException e) {
                if (e.getErrorCode() == RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS) break block2;
                throw e;
            }
        }
    }

    public void deleteFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup) throws KafkaException, SchemaException {
        String topicName = Utils.getFeatureGroupTopicName(featureGroup);
        String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
        if (topicName.equals(featureGroup.getTopicName())) {
            this.kafkaController.removeTopicFromProject(project, topicName);
        }
        if (!this.subjectsController.getSubjectVersions(project, featureGroupEntityName).isEmpty()) {
            this.subjectsController.deleteSubject(project, featureGroupEntityName);
        }
    }

    public void alterOnlineFeatureGroupSchema(Featuregroup featureGroup, List<FeatureGroupFeatureDTO> newFeatures, List<FeatureGroupFeatureDTO> fullNewSchema, Project project, Users user) throws FeaturestoreException, SchemaException, SQLException, KafkaException {
        String tableName = Utils.getFeaturegroupName(featureGroup);
        this.alterMySQLTableColumns(featureGroup.getFeaturestore(), tableName, newFeatures, project, user);
        this.alterFeatureGroupSchema(featureGroup, fullNewSchema, project);
    }

    public void alterFeatureGroupSchema(Featuregroup featureGroup, List<FeatureGroupFeatureDTO> fullNewSchema, Project project) throws FeaturestoreException, SchemaException, KafkaException {
        String avroSchema = this.avroSchemaConstructorController.constructSchema(featureGroup, fullNewSchema);
        this.schemasController.validateSchema(project, avroSchema);
        String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
        this.subjectsController.registerNewSubject(project, featureGroupEntityName, avroSchema, false);
    }

    public void disableOnlineFeatureGroup(Featuregroup featureGroup, Project project, Users user) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        this.dropMySQLTable(featureGroup, project, user);
        String topicName = Utils.getFeatureGroupTopicName(featureGroup);
        String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup);
        if (!this.subjectsController.getSubjectVersions(project, featureGroupEntityName).isEmpty()) {
            this.subjectsController.deleteSubject(project, featureGroupEntityName);
        }
    }

    public String buildCreateStatement(String dbName, String tableName, List<FeatureGroupFeatureDTO> features) {
        StringBuilder createStatement = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
        createStatement.append("`" + dbName + "`").append(".").append("`" + tableName + "`").append("(");
        ArrayList<String> columns = new ArrayList<String>();
        for (FeatureGroupFeatureDTO feature : features) {
            StringBuilder column = new StringBuilder();
            column.append("`" + feature.getName() + "`").append(" ").append(this.getOnlineType(feature));
            if (feature.getDefaultValue() != null) {
                column.append(" NOT NULL DEFAULT ");
                if (feature.getType().equalsIgnoreCase("string")) {
                    column.append("'").append(feature.getDefaultValue()).append("'");
                } else {
                    column.append(feature.getDefaultValue());
                }
            }
            columns.add(column.toString());
        }
        createStatement.append(StringUtils.join(columns, (String)", "));
        List pkFeatures = features.stream().filter(FeatureGroupFeatureDTO::getPrimary).collect(Collectors.toList());
        if (!pkFeatures.isEmpty()) {
            createStatement.append(", PRIMARY KEY (`");
            createStatement.append(StringUtils.join((Iterable)pkFeatures.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.toList()), (String)"`,`"));
            createStatement.append("`)");
        }
        createStatement.append(")");
        createStatement.append("ENGINE=ndbcluster ").append("COMMENT='NDB_TABLE=READ_BACKUP=1'");
        if (!Strings.isNullOrEmpty((String)this.settings.getOnlineFeatureStoreTableSpace())) {
            createStatement.append("/*!50100 TABLESPACE `").append(this.settings.getOnlineFeatureStoreTableSpace()).append("` STORAGE DISK */");
        }
        return createStatement.toString();
    }

    public String buildAlterStatement(String tableName, String dbName, List<FeatureGroupFeatureDTO> featureDTOs) {
        StringBuilder alterTableStatement = new StringBuilder("ALTER TABLE `" + dbName + "`.`" + tableName + "` ");
        ArrayList<String> addColumn = new ArrayList<String>();
        for (FeatureGroupFeatureDTO featureDTO : featureDTOs) {
            StringBuilder add = new StringBuilder("ADD COLUMN `" + featureDTO.getName() + "` " + this.getOnlineType(featureDTO));
            if (featureDTO.getDefaultValue() == null) {
                add.append(" DEFAULT NULL");
            }
            addColumn.add(add.toString());
        }
        addColumn.add("ALGORITHM=INPLACE");
        alterTableStatement.append(StringUtils.join(addColumn, (String)", ") + ";");
        return alterTableStatement.toString();
    }

    public void alterMySQLTableColumns(Featurestore featurestore, String tableName, List<FeatureGroupFeatureDTO> featureDTOs, Project project, Users user) throws FeaturestoreException {
        String dbName = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject());
        this.onlineFeaturestoreFacade.executeUpdateJDBCQuery(this.buildAlterStatement(tableName, dbName, featureDTOs), dbName, project, user);
    }

    public String getOnlineType(FeatureGroupFeatureDTO featureGroupFeatureDTO) {
        if (!Strings.isNullOrEmpty((String)featureGroupFeatureDTO.getOnlineType())) {
            return featureGroupFeatureDTO.getOnlineType().toLowerCase();
        }
        for (String mysqlType : SUPPORTED_MYSQL_TYPES) {
            if (!featureGroupFeatureDTO.getType().toUpperCase().startsWith(mysqlType)) continue;
            return featureGroupFeatureDTO.getType();
        }
        if (featureGroupFeatureDTO.getType().equalsIgnoreCase("boolean")) {
            return "tinyint";
        }
        if (featureGroupFeatureDTO.getType().equalsIgnoreCase("string")) {
            return VARCHAR_DEFAULT;
        }
        return VARBINARY_DEFAULT;
    }

    public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, int limit) throws FeaturestoreException {
        String tbl = this.featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
        List<FeatureGroupFeatureDTO> features = this.featuregroupController.getFeatures(featuregroup, project, user);
        SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
        for (FeatureGroupFeatureDTO feature : features) {
            if (feature.getDefaultValue() == null) {
                selectList.add((SqlNode)new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"), SqlParserPos.ZERO));
                continue;
            }
            selectList.add(this.constructorController.selectWithDefaultAs(new Feature(feature, tbl), false));
        }
        SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList, (SqlNode)new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO), null, null, null, null, null, null, (SqlNode)SqlLiteral.createExactNumeric((String)String.valueOf(limit), (SqlParserPos)SqlParserPos.ZERO), null);
        String db = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        try {
            return this.onlineFeaturestoreFacade.executeReadJDBCQuery(select.toSqlString((SqlDialect)new MysqlSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
        }
        catch (Exception e) {
            return this.onlineFeaturestoreFacade.executeReadJDBCQuery(select.toSqlString((SqlDialect)new MysqlSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
        }
    }

    public List<FeatureGroupFeatureDTO> getFeaturegroupFeatures(Featuregroup featuregroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS) throws FeaturestoreException {
        List<FeatureGroupFeatureDTO> onlineFeatureGroupFeatureDTOS = this.onlineFeaturestoreFacade.getMySQLFeatures(Utils.getFeatureStoreEntityName(featuregroup.getName(), featuregroup.getVersion()), this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
        for (FeatureGroupFeatureDTO featureGroupFeatureDTO : featureGroupFeatureDTOS) {
            for (FeatureGroupFeatureDTO onlineFeatureGroupFeatureDTO : onlineFeatureGroupFeatureDTOS) {
                if (!featureGroupFeatureDTO.getName().equalsIgnoreCase(onlineFeatureGroupFeatureDTO.getName())) continue;
                featureGroupFeatureDTO.setOnlineType(onlineFeatureGroupFeatureDTO.getType());
            }
        }
        return featureGroupFeatureDTOS;
    }
}

