/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.featuregroup.online;

import com.google.common.base.Strings;
import com.logicalclocks.shaded.org.apache.commons.lang3.StringUtils;
import io.hops.hopsworks.common.dao.kafka.TopicDTO;
import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
import io.hops.hopsworks.common.featurestore.featuregroup.online.AvroSchemaConstructorController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.kafka.KafkaController;
import io.hops.hopsworks.common.kafka.SchemasController;
import io.hops.hopsworks.common.kafka.SubjectsCompatibilityController;
import io.hops.hopsworks.common.kafka.SubjectsController;
import io.hops.hopsworks.common.project.ProjectController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.SchemaException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.kafka.schemas.SchemaCompatibility;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class OnlineFeaturegroupController {
    @EJB
    private OnlineFeaturestoreController onlineFeaturestoreController;
    @EJB
    private OnlineFeaturestoreFacade onlineFeaturestoreFacade;
    @EJB
    private Settings settings;
    @EJB
    private SubjectsController subjectsController;
    @EJB
    private KafkaController kafkaController;
    @EJB
    private AvroSchemaConstructorController avroSchemaConstructorController;
    @EJB
    private SchemasController schemasController;
    @EJB
    private SubjectsCompatibilityController subjectsCompatibilityController;
    @EJB
    private ProjectController projectController;
    private static final List<String> SUPPORTED_MYSQL_TYPES = Arrays.asList("INT", "TINYINT", "SMALLINT", "BIGINT", "FLOAT", "DOUBLE", "DECIMAL", "DATE", "TIMESTAMP");
    private static final String VARBINARY_DEFAULT = "VARBINARY(100)";
    private static final String VARCHAR_DEFAULT = "VARCHAR(100)";
    private static final String KAFKA_TOPIC_SUFFIX = "_onlinefs";

    public OnlineFeaturegroupController() {
    }

    protected OnlineFeaturegroupController(Settings settings) {
        this.settings = settings;
    }

    public void dropMySQLTable(Featuregroup featuregroup, Project project, Users user) throws SQLException, FeaturestoreException {
        String query = "DROP TABLE " + featuregroup.getName() + "_" + featuregroup.getVersion() + ";";
        this.onlineFeaturestoreController.executeUpdateJDBCQuery(query, this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()), project, user);
    }

    public void createMySQLTable(Featurestore featurestore, String tableName, List<FeatureGroupFeatureDTO> features, Project project, Users user) throws FeaturestoreException, SQLException {
        String dbName = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject());
        String createStatement = this.buildCreateStatement(dbName, tableName, features);
        this.onlineFeaturestoreController.executeUpdateJDBCQuery(createStatement, dbName, project, user);
    }

    public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup featureGroup, List<FeatureGroupFeatureDTO> features, Project project, Users user) throws FeaturestoreException, SQLException, SchemaException, KafkaException, ProjectException, UserException, IOException, HopsSecurityException, ServiceException {
        this.setupOnlineFeatureGroup(featureStore, featureGroup.getId(), featureGroup.getName(), featureGroup.getVersion(), features, project, user);
    }

    public void setupOnlineFeatureGroup(Featurestore featureStore, Integer featureGroupId, String featureGroupName, Integer featureGroupVersion, List<FeatureGroupFeatureDTO> features, Project project, Users user) throws KafkaException, SchemaException, ProjectException, UserException, FeaturestoreException, SQLException, IOException, HopsSecurityException, ServiceException {
        if (project.getProjectTeamCollection().stream().noneMatch(pt -> pt.getUser().getUsername().equals("onlinefs"))) {
            try {
                this.projectController.addOnlineFsUser(project).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new ServiceException(RESTCodes.ServiceErrorCode.SERVICE_GENERIC_ERROR, Level.SEVERE, "failed to add onlinefs user to project: " + project.getName(), e.getMessage(), (Throwable)e);
            }
        }
        String featureGroupEntityName = Utils.getFeatureStoreEntityName(featureGroupName, featureGroupVersion);
        this.createMySQLTable(featureStore, featureGroupEntityName, features, project, user);
        String topicName = this.onlineFeatureGroupTopicName(project.getId(), featureGroupId, featureGroupEntityName);
        this.createFeatureGroupKafkaTopic(project, featureGroupEntityName, topicName, features);
    }

    public void createFeatureGroupKafkaTopic(Project project, String featureGroupEntityName, String topicName, List<FeatureGroupFeatureDTO> features) throws KafkaException, SchemaException, ProjectException, UserException, FeaturestoreException {
        String avroSchema = this.avroSchemaConstructorController.constructSchema(featureGroupEntityName, Utils.getFeaturestoreName(project), features);
        this.schemasController.validateSchema(project, avroSchema);
        SubjectDTO topicSubject = this.subjectsController.registerNewSubject(project, topicName, avroSchema, false);
        this.subjectsCompatibilityController.setSubjectCompatibility(project, topicName, SchemaCompatibility.NONE);
        TopicDTO topicDTO = new TopicDTO(topicName, 1, this.settings.getOnlineFsThreadNumber(), topicSubject.getSubject(), topicSubject.getVersion());
        this.kafkaController.createTopic(project, topicDTO);
    }

    public String onlineFeatureGroupTopicName(Integer projectId, Integer featureGroupId, String featureGroupEntityName) {
        return projectId.toString() + "_" + featureGroupId.toString() + "_" + featureGroupEntityName + KAFKA_TOPIC_SUFFIX;
    }

    public void deleteFeatureGroupKafkaTopic(Project project, String topicName) throws KafkaException, SchemaException {
        if (this.kafkaController.projectTopicExists(project, topicName)) {
            this.kafkaController.removeTopicFromProject(project, topicName);
        }
        if (!this.subjectsController.getSubjectVersions(project, topicName).isEmpty()) {
            this.subjectsController.deleteSubject(project, topicName);
        }
    }

    public void alterOnlineFeatureGroupSchema(Featuregroup featureGroup, List<FeatureGroupFeatureDTO> newFeatures, List<FeatureGroupFeatureDTO> fullNewSchema, Project project, Users user) throws FeaturestoreException, SchemaException, SQLException, KafkaException {
        String tableName = Utils.getFeatureStoreEntityName(featureGroup.getName(), featureGroup.getVersion());
        String topicName = this.onlineFeatureGroupTopicName(project.getId(), featureGroup.getId(), tableName);
        this.alterMySQLTableColumns(featureGroup.getFeaturestore(), tableName, newFeatures, project, user);
        this.alterFeatureGroupSchema(featureGroup, fullNewSchema, topicName, project);
    }

    public void alterFeatureGroupSchema(Featuregroup featureGroup, List<FeatureGroupFeatureDTO> fullNewSchema, String topicName, Project project) throws FeaturestoreException, SchemaException, KafkaException {
        String avroSchema = this.avroSchemaConstructorController.constructSchema(featureGroup.getName(), Utils.getFeaturestoreName(project), fullNewSchema);
        this.schemasController.validateSchema(project, avroSchema);
        SubjectDTO topicSubject = this.subjectsController.registerNewSubject(project, topicName, avroSchema, false);
        this.kafkaController.updateTopicSchemaVersion(project, topicName, topicSubject.getVersion());
    }

    public void disableOnlineFeatureGroup(Featuregroup featureGroup, Project project, Users user) throws FeaturestoreException, SQLException, SchemaException, KafkaException {
        this.dropMySQLTable(featureGroup, project, user);
        String topicName = this.onlineFeatureGroupTopicName(project.getId(), featureGroup.getId(), Utils.getFeaturegroupName(featureGroup));
    }

    public String buildCreateStatement(String dbName, String tableName, List<FeatureGroupFeatureDTO> features) {
        StringBuilder createStatement = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
        createStatement.append("`" + dbName + "`").append(".").append("`" + tableName + "`").append("(");
        ArrayList<String> columns = new ArrayList<String>();
        for (FeatureGroupFeatureDTO feature : features) {
            StringBuilder column = new StringBuilder();
            column.append("`" + feature.getName() + "`").append(" ").append(this.getOnlineType(feature));
            if (feature.getDefaultValue() != null) {
                column.append(" NOT NULL DEFAULT ");
                if (feature.getType().equalsIgnoreCase("string")) {
                    column.append("'").append(feature.getDefaultValue()).append("'");
                } else {
                    column.append(feature.getDefaultValue());
                }
            }
            columns.add(column.toString());
        }
        createStatement.append(StringUtils.join(columns, (String)", "));
        List pkFeatures = features.stream().filter(FeatureGroupFeatureDTO::getPrimary).collect(Collectors.toList());
        if (!pkFeatures.isEmpty()) {
            createStatement.append(", PRIMARY KEY (`");
            createStatement.append(StringUtils.join((Iterable)pkFeatures.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.toList()), (String)"`,`"));
            createStatement.append("`)");
        }
        createStatement.append(")");
        createStatement.append("ENGINE=ndbcluster ").append("COMMENT='NDB_TABLE=READ_BACKUP=1'");
        if (!Strings.isNullOrEmpty((String)this.settings.getOnlineFeatureStoreTableSpace())) {
            createStatement.append("/*!50100 TABLESPACE `").append(this.settings.getOnlineFeatureStoreTableSpace()).append("` STORAGE DISK */");
        }
        return createStatement.toString();
    }

    public String buildAlterStatement(String tableName, String dbName, List<FeatureGroupFeatureDTO> featureDTOs) {
        StringBuilder alterTableStatement = new StringBuilder("ALTER TABLE `" + dbName + "`.`" + tableName + "` ");
        ArrayList<String> addColumn = new ArrayList<String>();
        for (FeatureGroupFeatureDTO featureDTO : featureDTOs) {
            StringBuilder add = new StringBuilder("ADD COLUMN `" + featureDTO.getName() + "` " + this.getOnlineType(featureDTO));
            if (featureDTO.getDefaultValue() != null) {
                add.append(" NOT NULL DEFAULT ");
                if (featureDTO.getType().equalsIgnoreCase("string")) {
                    add.append("'" + featureDTO.getDefaultValue() + "'");
                } else {
                    add.append(featureDTO.getDefaultValue() + "");
                }
            } else {
                add.append(" DEFAULT NULL");
            }
            addColumn.add(add.toString());
        }
        alterTableStatement.append(StringUtils.join(addColumn, (String)", ") + ";");
        return alterTableStatement.toString();
    }

    public void alterMySQLTableColumns(Featurestore featurestore, String tableName, List<FeatureGroupFeatureDTO> featureDTOs, Project project, Users user) throws FeaturestoreException, SQLException {
        String dbName = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject());
        this.onlineFeaturestoreController.executeUpdateJDBCQuery(this.buildAlterStatement(tableName, dbName, featureDTOs), dbName, project, user);
    }

    public String getOnlineType(FeatureGroupFeatureDTO featureGroupFeatureDTO) {
        if (!Strings.isNullOrEmpty((String)featureGroupFeatureDTO.getOnlineType())) {
            return featureGroupFeatureDTO.getOnlineType().toLowerCase();
        }
        for (String mysqlType : SUPPORTED_MYSQL_TYPES) {
            if (!featureGroupFeatureDTO.getType().toUpperCase().startsWith(mysqlType)) continue;
            return featureGroupFeatureDTO.getType();
        }
        if (featureGroupFeatureDTO.getType().equalsIgnoreCase("boolean")) {
            return "tinyint";
        }
        if (featureGroupFeatureDTO.getType().equalsIgnoreCase("string")) {
            return VARCHAR_DEFAULT;
        }
        return VARBINARY_DEFAULT;
    }

    public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, Users user, int limit) throws FeaturestoreException, SQLException {
        String tblName = featuregroup.getName() + "_" + featuregroup.getVersion();
        String query = "SELECT * FROM " + tblName + " LIMIT " + limit;
        String db = this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
        try {
            return this.onlineFeaturestoreController.executeReadJDBCQuery(query, db, project, user);
        }
        catch (Exception e) {
            return this.onlineFeaturestoreController.executeReadJDBCQuery(query, db, project, user);
        }
    }

    public String getFeaturegroupSchema(Featuregroup featuregroup) throws FeaturestoreException {
        return this.onlineFeaturestoreFacade.getMySQLSchema(Utils.getFeatureStoreEntityName(featuregroup.getName(), featuregroup.getVersion()), this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
    }

    public List<FeatureGroupFeatureDTO> getFeaturegroupFeatures(Featuregroup featuregroup, List<FeatureGroupFeatureDTO> featureGroupFeatureDTOS) throws FeaturestoreException {
        List<FeatureGroupFeatureDTO> onlineFeatureGroupFeatureDTOS = this.onlineFeaturestoreFacade.getMySQLFeatures(Utils.getFeatureStoreEntityName(featuregroup.getName(), featuregroup.getVersion()), this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
        for (FeatureGroupFeatureDTO featureGroupFeatureDTO : featureGroupFeatureDTOS) {
            for (FeatureGroupFeatureDTO onlineFeatureGroupFeatureDTO : onlineFeatureGroupFeatureDTOS) {
                if (!featureGroupFeatureDTO.getName().equalsIgnoreCase(onlineFeatureGroupFeatureDTO.getName())) continue;
                featureGroupFeatureDTO.setOnlineType(onlineFeatureGroupFeatureDTO.getType());
            }
        }
        return featureGroupFeatureDTOS;
    }

    public Long getFeaturegroupSize(Featuregroup featuregroup) {
        return this.onlineFeaturestoreFacade.getTblSize(Utils.getFeatureStoreEntityName(featuregroup.getName(), featuregroup.getVersion()), this.onlineFeaturestoreController.getOnlineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()));
    }
}

