/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.featurestore.storageconnectors.kafka;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorSecrets;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.FeatureStoreKafkaConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SSLEndpointIdentificationAlgorithm;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SecurityProtocol;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.transaction.Transactional;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NEVER)
public class FeatureStoreKafkaConnectorController {
    private static final Logger LOGGER = Logger.getLogger(FeatureStoreKafkaConnectorController.class.getName());
    @EJB
    private StorageConnectorUtil storageConnectorUtil;
    @EJB
    private DistributedFsService dfs;

    public FeatureStoreKafkaConnectorDTO getConnector(FeaturestoreConnector featurestoreConnector) throws FeaturestoreException {
        FeatureStoreKafkaConnectorDTO kafkaConnectorDTO = new FeatureStoreKafkaConnectorDTO(featurestoreConnector);
        if (featurestoreConnector.getKafkaConnector().getSslSecret() != null) {
            FeatureStoreKafkaConnectorSecrets kafkaConnectorSecrets = this.storageConnectorUtil.getSecret(featurestoreConnector.getKafkaConnector().getSslSecret(), FeatureStoreKafkaConnectorSecrets.class);
            kafkaConnectorDTO.setSslTruststorePassword(kafkaConnectorSecrets.getSslTrustStorePassword());
            kafkaConnectorDTO.setSslKeystorePassword(kafkaConnectorSecrets.getSslKeyStorePassword());
            kafkaConnectorDTO.setSslKeyPassword(kafkaConnectorSecrets.getSslKeyPassword());
        }
        kafkaConnectorDTO.setSslTruststoreLocation(featurestoreConnector.getKafkaConnector().getTrustStorePath());
        kafkaConnectorDTO.setSslKeystoreLocation(featurestoreConnector.getKafkaConnector().getKeyStorePath());
        kafkaConnectorDTO.setOptions(this.storageConnectorUtil.toOptions(featurestoreConnector.getKafkaConnector().getOptions()));
        return kafkaConnectorDTO;
    }

    public FeatureStoreKafkaConnector createConnector(Project project, Users user, Featurestore featureStore, FeatureStoreKafkaConnectorDTO kafkaConnectorDTO) throws ProjectException, UserException, FeaturestoreException {
        this.verifyUserInput(kafkaConnectorDTO);
        if (kafkaConnectorDTO.getSecurityProtocol() == SecurityProtocol.SSL) {
            this.verifySSLSecurityProtocolProperties(project, user, kafkaConnectorDTO);
        }
        FeatureStoreKafkaConnector kafkaConnector = new FeatureStoreKafkaConnector();
        this.setGeneralAttributes(kafkaConnectorDTO, kafkaConnector);
        String secretName = this.storageConnectorUtil.createSecretName(featureStore.getId(), kafkaConnectorDTO.getName(), kafkaConnectorDTO.getStorageConnectorType());
        FeatureStoreKafkaConnectorSecrets secretsClass = new FeatureStoreKafkaConnectorSecrets(kafkaConnectorDTO.getSslTruststorePassword(), kafkaConnectorDTO.getSslKeystorePassword(), kafkaConnectorDTO.getSslKeyPassword());
        kafkaConnector.setSslSecret(this.storageConnectorUtil.createProjectSecret(user, secretName, featureStore, secretsClass));
        return kafkaConnector;
    }

    private void setGeneralAttributes(FeatureStoreKafkaConnectorDTO kafkaConnectorDTO, FeatureStoreKafkaConnector kafkaConnector) {
        kafkaConnector.setBootstrapServers(kafkaConnectorDTO.getBootstrapServers());
        kafkaConnector.setSecurityProtocol(kafkaConnectorDTO.getSecurityProtocol());
        kafkaConnector.setOptions(this.storageConnectorUtil.fromOptions(kafkaConnectorDTO.getOptions()));
        kafkaConnector.setSslEndpointIdentificationAlgorithm(SSLEndpointIdentificationAlgorithm.fromString((String)kafkaConnectorDTO.getSslEndpointIdentificationAlgorithm()));
        kafkaConnector.setKeyStorePath(kafkaConnectorDTO.getSslKeystoreLocation());
        kafkaConnector.setTrustStorePath(kafkaConnectorDTO.getSslTruststoreLocation());
    }

    @TransactionAttribute(value=TransactionAttributeType.REQUIRED)
    @Transactional(rollbackOn={FeaturestoreException.class})
    public FeatureStoreKafkaConnector updateConnector(Project project, Users user, Featurestore featureStore, FeatureStoreKafkaConnectorDTO kafkaConnectorDTO, FeatureStoreKafkaConnector kafkaConnector) throws FeaturestoreException, ProjectException, UserException {
        this.verifyUserInput(kafkaConnectorDTO);
        if (kafkaConnectorDTO.getSecurityProtocol() == SecurityProtocol.SSL) {
            this.verifySSLSecurityProtocolProperties(project, user, kafkaConnectorDTO);
        }
        this.setGeneralAttributes(kafkaConnectorDTO, kafkaConnector);
        FeatureStoreKafkaConnectorSecrets secretsClass = new FeatureStoreKafkaConnectorSecrets(kafkaConnectorDTO.getSslTruststorePassword(), kafkaConnectorDTO.getSslKeystorePassword(), kafkaConnectorDTO.getSslKeyPassword());
        kafkaConnector.setSslSecret(this.storageConnectorUtil.updateProjectSecret(user, kafkaConnector.getSslSecret(), kafkaConnector.getSslSecret().getId().getName(), featureStore, secretsClass));
        return kafkaConnector;
    }

    private void verifyUserInput(FeatureStoreKafkaConnectorDTO kafkaConnectorDTO) throws FeaturestoreException {
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getBootstrapServers())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Bootstrap server string cannot be null or empty.");
        }
        if (kafkaConnectorDTO.getSecurityProtocol() == null) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Kafka security protocol cannot be null or empty.");
        }
        if (kafkaConnectorDTO.getBootstrapServers().length() > 1000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Bootstrap server string is too long, can be maximum 1000 characters.");
        }
        String optionString = this.storageConnectorUtil.fromOptions(kafkaConnectorDTO.getOptions());
        if (!Strings.isNullOrEmpty((String)optionString) && optionString.length() > 2000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Additional Kafka Option list is too long, please raise an Issue with the Hopsworks team.");
        }
    }

    private void verifySSLSecurityProtocolProperties(Project project, Users user, FeatureStoreKafkaConnectorDTO kafkaConnectorDTO) throws FeaturestoreException {
        DistributedFileSystemOps dfso = null;
        try {
            dfso = this.dfs.getDfsOps(project, user);
            this.storageConnectorUtil.validatePath(dfso, kafkaConnectorDTO.getSslTruststoreLocation(), "Truststore location");
            this.storageConnectorUtil.validatePath(dfso, kafkaConnectorDTO.getSslKeystoreLocation(), "Keystore location");
        }
        catch (IOException e) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.INFO, "Error validating keystore/truststore path", e.getMessage(), (Throwable)e);
        }
        finally {
            this.dfs.closeDfsClient(dfso);
        }
        if (Strings.isNullOrEmpty((String)kafkaConnectorDTO.getSslKeyPassword())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Key password cannot be null or empty for Kafka SSL Security Policy.");
        }
    }
}

