/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.storageconnectors.kafka;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorSecrets;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.FeatureStoreKafkaConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SSLEndpointIdentificationAlgorithm;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SecurityProtocol;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.transaction.Transactional;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class FeatureStoreKafkaConnectorController {
    private static final Logger LOGGER = Logger.getLogger(FeatureStoreKafkaConnectorController.class.getName());
    @EJB
    private StorageConnectorUtil storageConnectorUtil;
    @EJB
    private InodeController inodeController;

    public FeatureStoreKafkaConnectorDTO getConnector(FeaturestoreConnector featurestoreConnector) throws FeaturestoreException {
        FeatureStoreKafkaConnectorDTO kafkaConnectorDTO = new FeatureStoreKafkaConnectorDTO(featurestoreConnector);
        if (featurestoreConnector.getKafkaConnector().getSslSecret() != null) {
            FeatureStoreKafkaConnectorSecrets kafkaConnectorSecrets = this.storageConnectorUtil.getSecret(featurestoreConnector.getKafkaConnector().getSslSecret(), FeatureStoreKafkaConnectorSecrets.class);
            kafkaConnectorDTO.setSslTruststorePassword(kafkaConnectorSecrets.getSslTrustStorePassword());
            kafkaConnectorDTO.setSslKeystorePassword(kafkaConnectorSecrets.getSslKeyStorePassword());
            kafkaConnectorDTO.setSslKeyPassword(kafkaConnectorSecrets.getSslKeyPassword());
        }
        kafkaConnectorDTO.setSslTruststoreLocation(featurestoreConnector.getKafkaConnector().getTruststoreInode() == null ? null : this.inodeController.getPath(featurestoreConnector.getKafkaConnector().getTruststoreInode()));
        kafkaConnectorDTO.setSslKeystoreLocation(featurestoreConnector.getKafkaConnector().getKeystoreInode() == null ? null : this.inodeController.getPath(featurestoreConnector.getKafkaConnector().getKeystoreInode()));
        kafkaConnectorDTO.setOptions(this.storageConnectorUtil.toOptions(featurestoreConnector.getKafkaConnector().getOptions()));
        return kafkaConnectorDTO;
    }

    public FeatureStoreKafkaConnector createConnector(Users user, Featurestore featureStore, FeatureStoreKafkaConnectorDTO kafkaConnectorDTO) throws ProjectException, UserException, FeaturestoreException {
        this.verifyUserInput(kafkaConnectorDTO);
        if (kafkaConnectorDTO.getSecurityProtocol() == SecurityProtocol.SSL) {
            this.verifySSLSecurityProtocolProperties(kafkaConnectorDTO);
        }
        FeatureStoreKafkaConnector kafkaConnector = new FeatureStoreKafkaConnector();
        this.setGeneralAttributes(kafkaConnectorDTO, kafkaConnector);
        this.setTrustStoreKeyStoreInodes(kafkaConnectorDTO, kafkaConnector);
        String secretName = this.storageConnectorUtil.createSecretName(featureStore.getId(), kafkaConnectorDTO.getName(), kafkaConnectorDTO.getStorageConnectorType());
        FeatureStoreKafkaConnectorSecrets secretsClass = new FeatureStoreKafkaConnectorSecrets(kafkaConnectorDTO.getSslTruststorePassword(), kafkaConnectorDTO.getSslKeystorePassword(), kafkaConnectorDTO.getSslKeyPassword());
        kafkaConnector.setSslSecret(this.storageConnectorUtil.createProjectSecret(user, secretName, featureStore, secretsClass));
        return kafkaConnector;
    }

    private void setGeneralAttributes(FeatureStoreKafkaConnectorDTO kafkaConnectorDTO, FeatureStoreKafkaConnector kafkaConnector) {
        kafkaConnector.setBootstrapServers(kafkaConnectorDTO.getBootstrapServers());
        kafkaConnector.setSecurityProtocol(kafkaConnectorDTO.getSecurityProtocol());
        kafkaConnector.setOptions(this.storageConnectorUtil.fromOptions(kafkaConnectorDTO.getOptions()));
        kafkaConnector.setSslEndpointIdentificationAlgorithm(SSLEndpointIdentificationAlgorithm.fromString((String)kafkaConnectorDTO.getSslEndpointIdentificationAlgorithm()));
    }

    private void setTrustStoreKeyStoreInodes(FeatureStoreKafkaConnectorDTO kafkaConnectorDTO, FeatureStoreKafkaConnector kafkaConnector) throws FeaturestoreException {
        Inode keystoreInode;
        Inode truststoreInode = kafkaConnectorDTO.getSslTruststoreLocation() == null ? null : this.inodeController.getInodeAtPath(kafkaConnectorDTO.getSslTruststoreLocation());
        Inode inode = keystoreInode = kafkaConnectorDTO.getSslKeystoreLocation() == null ? null : this.inodeController.getInodeAtPath(kafkaConnectorDTO.getSslKeystoreLocation());
        if (truststoreInode == null && kafkaConnectorDTO.getSslTruststoreLocation() != null) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.KAFKA_STORAGE_CONNECTOR_STORE_NOT_EXISTING, Level.FINE, "Could not find trust store in provided location: " + kafkaConnectorDTO.getSslTruststoreLocation());
        }
        if (keystoreInode == null && kafkaConnectorDTO.getSslKeystoreLocation() != null) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.KAFKA_STORAGE_CONNECTOR_STORE_NOT_EXISTING, Level.FINE, "Could not find key store in provided location: " + kafkaConnectorDTO.getSslKeystoreLocation());
        }
        kafkaConnector.setTruststoreInode(truststoreInode);
        kafkaConnector.setKeystoreInode(keystoreInode);
    }

    @TransactionAttribute(value=TransactionAttributeType.REQUIRED)
    @Transactional(rollbackOn={FeaturestoreException.class})
    public FeatureStoreKafkaConnector updateConnector(Users user, Featurestore featureStore, FeatureStoreKafkaConnectorDTO kafkaConnectorDTO, FeatureStoreKafkaConnector kafkaConnector) throws FeaturestoreException, ProjectException, UserException {
        this.verifyUserInput(kafkaConnectorDTO);
        if (kafkaConnectorDTO.getSecurityProtocol() == SecurityProtocol.SSL) {
            this.verifySSLSecurityProtocolProperties(kafkaConnectorDTO);
        }
        this.setGeneralAttributes(kafkaConnectorDTO, kafkaConnector);
        this.setTrustStoreKeyStoreInodes(kafkaConnectorDTO, kafkaConnector);
        FeatureStoreKafkaConnectorSecrets secretsClass = new FeatureStoreKafkaConnectorSecrets(kafkaConnectorDTO.getSslTruststorePassword(), kafkaConnectorDTO.getSslKeystorePassword(), kafkaConnectorDTO.getSslKeyPassword());
        kafkaConnector.setSslSecret(this.storageConnectorUtil.updateProjectSecret(user, kafkaConnector.getSslSecret(), kafkaConnector.getSslSecret().getId().getName(), featureStore, secretsClass));
        return kafkaConnector;
    }

    private void verifyUserInput(FeatureStoreKafkaConnectorDTO kafkaConnectorDTO) throws FeaturestoreException {
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getBootstrapServers())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Bootstrap server string cannot be null or empty.");
        }
        if (kafkaConnectorDTO.getSecurityProtocol() == null) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Kafka security protocol cannot be null or empty.");
        }
        if (kafkaConnectorDTO.getBootstrapServers().length() > 1000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Bootstrap server string is too long, can be maximum 1000 characters.");
        }
        String optionString = this.storageConnectorUtil.fromOptions(kafkaConnectorDTO.getOptions());
        if (!Strings.isNullOrEmpty((String)optionString) && optionString.length() > 2000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Additional Kafka Option list is too long, please raise an Issue with the Hopsworks team.");
        }
    }

    private void verifySSLSecurityProtocolProperties(FeatureStoreKafkaConnectorDTO kafkaConnectorDTO) throws FeaturestoreException {
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getSslTruststoreLocation())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Truststore location cannot be null or empty for Kafka SSL Security Policy.");
        }
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getSslKeystoreLocation())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Keystore location cannot be null or empty for Kafka SSL Security Policy.");
        }
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getSslTruststorePassword())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Truststore password cannot be null or empty for Kafka SSL Security Policy.");
        }
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getSslKeystorePassword())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Keystore password cannot be null or empty for Kafka SSL Security Policy.");
        }
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getSslKeyPassword())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Key password cannot be null or empty for Kafka SSL Security Policy.");
        }
    }
}

