/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.storageconnectors;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil;
import io.hops.hopsworks.common.featurestore.storageconnectors.adls.FeaturestoreADLSConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.adls.FeaturestoreADLSConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.bigquery.FeaturestoreBigqueryConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.bigquery.FeaturestoreBigqueryConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.gcs.FeatureStoreGcsConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.gcs.FeatureStoreGcsConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.hopsfs.FeaturestoreHopsfsConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.hopsfs.FeaturestoreHopsfsConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.jdbc.FeaturestoreJdbcConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.jdbc.FeaturestoreJdbcConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.redshift.FeaturestoreRedshiftConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.redshift.FeaturestoreRedshiftConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.s3.FeaturestoreS3ConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.s3.FeaturestoreS3ConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.snowflake.FeaturestoreSnowflakeConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.snowflake.FeaturestoreSnowflakeConnectorDTO;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SecurityProtocol;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.transaction.Transactional;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class FeaturestoreStorageConnectorController {
    @EJB
    private FeaturestoreHopsfsConnectorController hopsfsConnectorController;
    @EJB
    private FeaturestoreJdbcConnectorController jdbcConnectorController;
    @EJB
    private FeaturestoreRedshiftConnectorController redshiftConnectorController;
    @EJB
    private FeaturestoreS3ConnectorController s3ConnectorController;
    @EJB
    private FeaturestoreConnectorFacade featurestoreConnectorFacade;
    @EJB
    private OnlineFeaturestoreController onlineFeaturestoreController;
    @EJB
    private FeaturestoreADLSConnectorController adlsConnectorController;
    @EJB
    private FeaturestoreSnowflakeConnectorController snowflakeConnectorController;
    @EJB
    private FeatureStoreKafkaConnectorController kafkaConnectorController;
    @EJB
    private FeaturestoreBigqueryConnectorController bigqueryConnectorController;
    @EJB
    private ActivityFacade activityFacade;
    @EJB
    private FeatureStoreGcsConnectorController gcsConnectorController;
    @EJB
    private FeaturestoreUtils featurestoreUtils;
    @EJB
    private StorageConnectorUtil storageConnectorUtil;
    @EJB
    private KafkaBrokers kafkaBrokers;
    @EJB
    private FeaturestoreController featurestoreController;
    @EJB
    private Settings settings;
    private static final String KAFKA_STORAGE_CONNECTOR_NAME = "kafka_connector";

    public List<FeaturestoreStorageConnectorDTO> getConnectorsForFeaturestore(Users user, Project project, Featurestore featurestore) throws FeaturestoreException {
        Set<FeaturestoreConnectorType> enabledScTypes = this.storageConnectorUtil.getEnabledStorageConnectorTypes();
        List<FeaturestoreConnector> connectors = this.featurestoreConnectorFacade.findByType(featurestore, enabledScTypes);
        return this.convertToConnectorDTOs(user, project, connectors);
    }

    public FeaturestoreStorageConnectorDTO getConnectorWithName(Users user, Project project, Featurestore featurestore, String connectorName) throws FeaturestoreException {
        FeaturestoreConnector featurestoreConnector = this.featurestoreConnectorFacade.findByFeaturestoreName(featurestore, connectorName).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND, Level.FINE, "Cannot find storage connector with name: " + connectorName));
        if (!this.storageConnectorUtil.isStorageConnectorTypeEnabled(featurestoreConnector.getConnectorType())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STORAGE_CONNECTOR_TYPE_NOT_ENABLED, Level.FINE, "Storage connector type '" + featurestoreConnector.getConnectorType() + "' is not enabled");
        }
        return this.convertToConnectorDTO(user, project, featurestoreConnector);
    }

    public List<FeaturestoreStorageConnectorDTO> convertToConnectorDTOs(Users user, Project project, List<FeaturestoreConnector> featurestoreConnectors) throws FeaturestoreException {
        ArrayList<FeaturestoreStorageConnectorDTO> featurestoreStorageConnectorDTOS = new ArrayList<FeaturestoreStorageConnectorDTO>();
        for (FeaturestoreConnector featurestoreConnector : featurestoreConnectors) {
            featurestoreStorageConnectorDTOS.add(this.convertToConnectorDTO(user, project, featurestoreConnector));
        }
        return featurestoreStorageConnectorDTOS;
    }

    public FeatureStoreKafkaConnectorDTO getKafkaConnector(Project project) throws FeaturestoreException {
        Featurestore featureStore = this.featurestoreController.getProjectFeaturestore(project);
        return this.getKafkaConnector(featureStore, KafkaBrokers.BrokerProtocol.INTERNAL);
    }

    public FeatureStoreKafkaConnectorDTO getKafkaConnector(Featurestore featureStore, KafkaBrokers.BrokerProtocol brokerProtocol) throws FeaturestoreException {
        FeatureStoreKafkaConnectorDTO kafkaConnectorDTO;
        Optional<FeaturestoreConnector> featurestoreConnector = this.featurestoreConnectorFacade.findByFeaturestoreName(featureStore, KAFKA_STORAGE_CONNECTOR_NAME);
        if (featurestoreConnector.isPresent() && this.settings.isBringYourOwnKafkaEnabled()) {
            FeaturestoreConnector connector = featurestoreConnector.get();
            if (!connector.getConnectorType().equals((Object)FeaturestoreConnectorType.KAFKA)) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_TYPE, Level.FINE, "Storage connector type should be KAFKA");
            }
            kafkaConnectorDTO = (FeatureStoreKafkaConnectorDTO)this.convertToConnectorDTO(null, featureStore.getProject(), connector);
            kafkaConnectorDTO.setExternalKafka(Boolean.TRUE);
        } else {
            kafkaConnectorDTO = new FeatureStoreKafkaConnectorDTO();
            kafkaConnectorDTO.setId(-1);
            kafkaConnectorDTO.setName(KAFKA_STORAGE_CONNECTOR_NAME);
            kafkaConnectorDTO.setDescription("Connector used for exchanging information within hopsworks");
            kafkaConnectorDTO.setFeaturestoreId(featureStore.getId());
            kafkaConnectorDTO.setStorageConnectorType(FeaturestoreConnectorType.KAFKA);
            kafkaConnectorDTO.setBootstrapServers(this.kafkaBrokers.getBrokerEndpointsString(brokerProtocol));
            kafkaConnectorDTO.setSecurityProtocol(SecurityProtocol.SSL);
            kafkaConnectorDTO.setSslEndpointIdentificationAlgorithm("");
            kafkaConnectorDTO.setExternalKafka(Boolean.FALSE);
        }
        return kafkaConnectorDTO;
    }

    public FeaturestoreStorageConnectorDTO convertToConnectorDTO(Users user, Project project, FeaturestoreConnector featurestoreConnector) throws FeaturestoreException {
        switch (featurestoreConnector.getConnectorType()) {
            case S3: {
                return this.s3ConnectorController.getS3ConnectorDTO(featurestoreConnector);
            }
            case JDBC: {
                return this.jdbcConnectorController.getJdbcConnectorDTO(user, project, featurestoreConnector);
            }
            case HOPSFS: {
                return this.hopsfsConnectorController.getHopsfsConnectorDTO(featurestoreConnector);
            }
            case REDSHIFT: {
                return this.redshiftConnectorController.getRedshiftConnectorDTO(featurestoreConnector);
            }
            case ADLS: {
                return this.adlsConnectorController.getADLConnectorDTO(featurestoreConnector);
            }
            case SNOWFLAKE: {
                return this.snowflakeConnectorController.getConnector(featurestoreConnector);
            }
            case KAFKA: {
                return this.kafkaConnectorController.getConnector(featurestoreConnector);
            }
            case GCS: {
                return this.gcsConnectorController.getConnector(featurestoreConnector);
            }
            case BIGQUERY: {
                return this.bigqueryConnectorController.getBigqueryConnectorDTO(featurestoreConnector);
            }
        }
        throw new IllegalArgumentException("Feature Store connector type not recognized");
    }

    public FeaturestoreStorageConnectorDTO createStorageConnector(Users user, Project project, Featurestore featurestore, FeaturestoreStorageConnectorDTO featurestoreStorageConnectorDTO) throws FeaturestoreException, UserException, ProjectException {
        this.featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featurestore, FeaturestoreUtils.ActionMessage.CREATE_STORAGE_CONNECTOR);
        if (!this.storageConnectorUtil.isStorageConnectorTypeEnabled(featurestoreStorageConnectorDTO.getStorageConnectorType())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STORAGE_CONNECTOR_TYPE_NOT_ENABLED, Level.FINE, "Storage connector type '" + featurestoreStorageConnectorDTO.getStorageConnectorType() + "' is not enabled");
        }
        if (this.featurestoreConnectorFacade.findByFeaturestoreName(featurestore, featurestoreStorageConnectorDTO.getName()).isPresent()) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_NAME, Level.FINE, "Storage connector with the same name already exists. Name=" + featurestoreStorageConnectorDTO.getName());
        }
        if (KAFKA_STORAGE_CONNECTOR_NAME.equals(featurestoreStorageConnectorDTO.getName()) && !featurestoreStorageConnectorDTO.getStorageConnectorType().equals((Object)FeaturestoreConnectorType.KAFKA)) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_NAME, Level.FINE, "The provided storage connector name is reserved exclusively for KAFKA connector.");
        }
        FeaturestoreConnector featurestoreConnector = new FeaturestoreConnector();
        this.verifyName(featurestoreStorageConnectorDTO);
        featurestoreConnector.setName(featurestoreStorageConnectorDTO.getName());
        this.verifyDescription(featurestoreStorageConnectorDTO);
        featurestoreConnector.setDescription(featurestoreStorageConnectorDTO.getDescription());
        featurestoreConnector.setFeaturestore(featurestore);
        switch (featurestoreStorageConnectorDTO.getStorageConnectorType()) {
            case HOPSFS: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.HOPSFS);
                featurestoreConnector.setHopsfsConnector(this.hopsfsConnectorController.createFeaturestoreHopsfsConnector(featurestore, (FeaturestoreHopsfsConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case S3: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.S3);
                featurestoreConnector.setS3Connector(this.s3ConnectorController.createFeaturestoreS3Connector(user, featurestore, (FeaturestoreS3ConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case JDBC: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.JDBC);
                featurestoreConnector.setJdbcConnector(this.jdbcConnectorController.createFeaturestoreJdbcConnector((FeaturestoreJdbcConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case REDSHIFT: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.REDSHIFT);
                featurestoreConnector.setRedshiftConnector(this.redshiftConnectorController.createFeaturestoreRedshiftConnector(user, featurestore, (FeaturestoreRedshiftConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case ADLS: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.ADLS);
                featurestoreConnector.setAdlsConnector(this.adlsConnectorController.createADLConnector(user, project, featurestore, (FeaturestoreADLSConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case SNOWFLAKE: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.SNOWFLAKE);
                featurestoreConnector.setSnowflakeConnector(this.snowflakeConnectorController.createConnector(user, featurestore, (FeaturestoreSnowflakeConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case KAFKA: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.KAFKA);
                featurestoreConnector.setKafkaConnector(this.kafkaConnectorController.createConnector(project, user, featurestore, (FeatureStoreKafkaConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case GCS: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.GCS);
                featurestoreConnector.setGcsConnector(this.gcsConnectorController.createConnector(project, user, featurestore, (FeatureStoreGcsConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            case BIGQUERY: {
                featurestoreConnector.setConnectorType(FeaturestoreConnectorType.BIGQUERY);
                featurestoreConnector.setBigqueryConnector(this.bigqueryConnectorController.createBigqueryConnector(project, user, (FeaturestoreBigqueryConnectorDTO)featurestoreStorageConnectorDTO));
                break;
            }
            default: {
                throw new IllegalArgumentException("Feature Store connector type not recognized");
            }
        }
        featurestoreConnector = (FeaturestoreConnector)this.featurestoreConnectorFacade.update(featurestoreConnector);
        this.activityFacade.persistActivity(" added a storage connector for the featurestore with name: " + featurestoreConnector.getName(), project, user, ActivityFlag.SERVICE);
        return this.convertToConnectorDTO(user, project, featurestoreConnector);
    }

    @TransactionAttribute(value=TransactionAttributeType.REQUIRED)
    @Transactional(rollbackOn={FeaturestoreException.class})
    public void updateStorageConnector(Users user, Project project, Featurestore featurestore, FeaturestoreStorageConnectorDTO featurestoreStorageConnectorDTO, String connectorName) throws FeaturestoreException, UserException, ProjectException {
        if (!this.storageConnectorUtil.isStorageConnectorTypeEnabled(featurestoreStorageConnectorDTO.getStorageConnectorType())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STORAGE_CONNECTOR_TYPE_NOT_ENABLED, Level.FINE, "Storage connector type '" + featurestoreStorageConnectorDTO.getStorageConnectorType() + "' is not enabled");
        }
        if (!connectorName.equalsIgnoreCase(featurestoreStorageConnectorDTO.getName())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Can not update connector name.");
        }
        FeaturestoreConnector featurestoreConnector = this.featurestoreConnectorFacade.findByFeaturestoreName(featurestore, connectorName).orElseThrow(() -> new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND, Level.FINE, "Cannot find storage connector with name: " + connectorName));
        this.featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featurestoreConnector.getFeaturestore(), FeaturestoreUtils.ActionMessage.UPDATE_STORAGE_CONNECTOR);
        this.verifyDescription(featurestoreStorageConnectorDTO);
        featurestoreConnector.setDescription(featurestoreStorageConnectorDTO.getDescription());
        switch (featurestoreConnector.getConnectorType()) {
            case HOPSFS: {
                featurestoreConnector.setHopsfsConnector(this.hopsfsConnectorController.updateFeaturestoreHopsfsConnector(featurestore, (FeaturestoreHopsfsConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getHopsfsConnector()));
                break;
            }
            case S3: {
                featurestoreConnector.setS3Connector(this.s3ConnectorController.updateFeaturestoreS3Connector(user, featurestore, (FeaturestoreS3ConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getS3Connector()));
                break;
            }
            case JDBC: {
                featurestoreConnector.setJdbcConnector(this.jdbcConnectorController.updateFeaturestoreJdbcConnector((FeaturestoreJdbcConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getJdbcConnector()));
                break;
            }
            case REDSHIFT: {
                featurestoreConnector.setRedshiftConnector(this.redshiftConnectorController.updateFeaturestoreRedshiftConnector(user, featurestore, (FeaturestoreRedshiftConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getRedshiftConnector()));
                break;
            }
            case ADLS: {
                featurestoreConnector.setAdlsConnector(this.adlsConnectorController.updateAdlConnector(user, featurestore, (FeaturestoreADLSConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getAdlsConnector()));
                break;
            }
            case SNOWFLAKE: {
                featurestoreConnector.setSnowflakeConnector(this.snowflakeConnectorController.updateConnector(user, (FeaturestoreSnowflakeConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getSnowflakeConnector()));
                break;
            }
            case KAFKA: {
                featurestoreConnector.setKafkaConnector(this.kafkaConnectorController.updateConnector(project, user, featurestore, (FeatureStoreKafkaConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getKafkaConnector()));
                break;
            }
            case GCS: {
                featurestoreConnector.setGcsConnector(this.gcsConnectorController.updateConnector(project, user, featurestore, (FeatureStoreGcsConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getGcsConnector()));
                break;
            }
            case BIGQUERY: {
                featurestoreConnector.setBigqueryConnector(this.bigqueryConnectorController.updateBigqueryConnector(project, user, (FeaturestoreBigqueryConnectorDTO)featurestoreStorageConnectorDTO, featurestoreConnector.getBigqueryConnector()));
                break;
            }
            default: {
                throw new IllegalArgumentException("Feature Store connector type not recognized");
            }
        }
        featurestoreConnector = (FeaturestoreConnector)this.featurestoreConnectorFacade.update(featurestoreConnector);
        this.activityFacade.persistActivity(" updated a storage connector for the featurestore with name: " + featurestoreConnector.getName(), project, user, ActivityFlag.SERVICE);
    }

    @TransactionAttribute(value=TransactionAttributeType.REQUIRED)
    public void deleteConnectorWithName(Users user, Project project, String connectorName, Featurestore featurestore) throws UserException, FeaturestoreException {
        Optional<FeaturestoreConnector> featurestoreConnectorOptional = this.featurestoreConnectorFacade.findByFeaturestoreName(featurestore, connectorName);
        if (!featurestoreConnectorOptional.isPresent()) {
            return;
        }
        FeaturestoreConnector featurestoreConnector = featurestoreConnectorOptional.get();
        if (!this.storageConnectorUtil.isStorageConnectorTypeEnabled(featurestoreConnector.getConnectorType())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STORAGE_CONNECTOR_TYPE_NOT_ENABLED, Level.FINE, "Storage connector type '" + featurestoreConnector.getConnectorType() + "' is not enabled");
        }
        this.featurestoreUtils.verifyUserProjectEqualsFsProjectAndDataOwner(user, project, featurestoreConnector.getFeaturestore(), FeaturestoreUtils.ActionMessage.DELETE_STORAGE_CONNECTOR);
        this.cleanKeyFile(project, user, featurestoreConnector);
        this.featurestoreConnectorFacade.remove(featurestoreConnector);
        this.activityFacade.persistActivity(" added a storage connector for the featurestore with name: " + featurestoreConnector.getName(), project, user, ActivityFlag.SERVICE);
    }

    public FeaturestoreStorageConnectorDTO getOnlineFeaturestoreConnector(Users user, Project userProject) throws FeaturestoreException {
        Featurestore userFeatureStore = this.featurestoreController.getProjectFeaturestore(userProject);
        String dbUsername = this.onlineFeaturestoreController.onlineDbUsername(userProject, user);
        this.onlineFeaturestoreController.setupOnlineFeaturestore(userProject, userFeatureStore, user);
        Optional<FeaturestoreConnector> featurestoreConnector = this.featurestoreConnectorFacade.findByFeaturestoreName(userFeatureStore, dbUsername + "_onlinefeaturestore");
        if (featurestoreConnector.isPresent()) {
            return this.convertToConnectorDTO(user, userProject, featurestoreConnector.get());
        }
        return null;
    }

    private void verifyName(FeaturestoreStorageConnectorDTO connectorDTO) throws FeaturestoreException {
        if (Strings.isNullOrEmpty((String)connectorDTO.getName())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_NAME + ", the storage connector name cannot be empty");
        }
        if (connectorDTO.getName().length() > 1000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_NAME + ", the name should be less than " + 1000 + " characters, the provided name was: " + connectorDTO.getName());
        }
    }

    private void verifyDescription(FeaturestoreStorageConnectorDTO connectorDTO) throws FeaturestoreException {
        if (connectorDTO.getDescription() != null && connectorDTO.getDescription().length() > 1000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_DESCRIPTION + ", the description should be less than: " + 1000);
        }
    }

    private void cleanKeyFile(Project project, Users user, FeaturestoreConnector featurestoreConnector) throws FeaturestoreException {
        switch (featurestoreConnector.getConnectorType()) {
            case GCS: {
                this.storageConnectorUtil.removeHdfsFile(project, user, featurestoreConnector.getGcsConnector().getKeyPath());
                break;
            }
            case BIGQUERY: {
                this.storageConnectorUtil.removeHdfsFile(project, user, featurestoreConnector.getBigqueryConnector().getKeyPath());
                break;
            }
            case KAFKA: {
                if (!Strings.isNullOrEmpty((String)featurestoreConnector.getKafkaConnector().getKeyStorePath())) {
                    this.storageConnectorUtil.removeHdfsFile(project, user, featurestoreConnector.getKafkaConnector().getKeyStorePath());
                }
                if (Strings.isNullOrEmpty((String)featurestoreConnector.getKafkaConnector().getTrustStorePath())) break;
                this.storageConnectorUtil.removeHdfsFile(project, user, featurestoreConnector.getKafkaConnector().getTrustStorePath());
            }
        }
    }
}

