package io.hops.hopsworks.common.featurestore.storageconnectors.kafka;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.ProjectException;
import io.hops.hopsworks.exceptions.UserException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.FeatureStoreKafkaConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SSLEndpointIdentificationAlgorithm;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.kafka.SecurityProtocol;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.transaction.Transactional;

@TransactionAttribute(TransactionAttributeType.NEVER)
@Stateless
/* loaded from: input_file:io/hops/hopsworks/common/featurestore/storageconnectors/kafka/FeatureStoreKafkaConnectorController.class */
public class FeatureStoreKafkaConnectorController {
    private static final Logger LOGGER = Logger.getLogger(FeatureStoreKafkaConnectorController.class.getName());

    @EJB
    private StorageConnectorUtil storageConnectorUtil;

    @EJB
    private DistributedFsService dfs;

    public FeatureStoreKafkaConnectorDTO getConnector(FeaturestoreConnector featurestoreConnector) throws FeaturestoreException {
        FeatureStoreKafkaConnectorDTO featureStoreKafkaConnectorDTO = new FeatureStoreKafkaConnectorDTO(featurestoreConnector);
        if (featurestoreConnector.getKafkaConnector().getSslSecret() != null) {
            FeatureStoreKafkaConnectorSecrets featureStoreKafkaConnectorSecrets = (FeatureStoreKafkaConnectorSecrets) this.storageConnectorUtil.getSecret(featurestoreConnector.getKafkaConnector().getSslSecret(), FeatureStoreKafkaConnectorSecrets.class);
            featureStoreKafkaConnectorDTO.setSslTruststorePassword(featureStoreKafkaConnectorSecrets.getSslTrustStorePassword());
            featureStoreKafkaConnectorDTO.setSslKeystorePassword(featureStoreKafkaConnectorSecrets.getSslKeyStorePassword());
            featureStoreKafkaConnectorDTO.setSslKeyPassword(featureStoreKafkaConnectorSecrets.getSslKeyPassword());
        }
        featureStoreKafkaConnectorDTO.setSslTruststoreLocation(featurestoreConnector.getKafkaConnector().getTrustStorePath());
        featureStoreKafkaConnectorDTO.setSslKeystoreLocation(featurestoreConnector.getKafkaConnector().getKeyStorePath());
        featureStoreKafkaConnectorDTO.setOptions(this.storageConnectorUtil.toOptions(featurestoreConnector.getKafkaConnector().getOptions()));
        return featureStoreKafkaConnectorDTO;
    }

    public FeatureStoreKafkaConnector createConnector(Project project, Users users, Featurestore featurestore, FeatureStoreKafkaConnectorDTO featureStoreKafkaConnectorDTO) throws ProjectException, UserException, FeaturestoreException {
        verifyUserInput(featureStoreKafkaConnectorDTO);
        if (featureStoreKafkaConnectorDTO.getSecurityProtocol() == SecurityProtocol.SSL) {
            verifySSLSecurityProtocolProperties(project, users, featureStoreKafkaConnectorDTO);
        }
        FeatureStoreKafkaConnector featureStoreKafkaConnector = new FeatureStoreKafkaConnector();
        setGeneralAttributes(featureStoreKafkaConnectorDTO, featureStoreKafkaConnector);
        featureStoreKafkaConnector.setSslSecret(this.storageConnectorUtil.createProjectSecret(users, this.storageConnectorUtil.createSecretName(featurestore.getId(), featureStoreKafkaConnectorDTO.getName(), featureStoreKafkaConnectorDTO.getStorageConnectorType()), featurestore, new FeatureStoreKafkaConnectorSecrets(featureStoreKafkaConnectorDTO.getSslTruststorePassword(), featureStoreKafkaConnectorDTO.getSslKeystorePassword(), featureStoreKafkaConnectorDTO.getSslKeyPassword())));
        return featureStoreKafkaConnector;
    }

    private void setGeneralAttributes(FeatureStoreKafkaConnectorDTO featureStoreKafkaConnectorDTO, FeatureStoreKafkaConnector featureStoreKafkaConnector) {
        featureStoreKafkaConnector.setBootstrapServers(featureStoreKafkaConnectorDTO.getBootstrapServers());
        featureStoreKafkaConnector.setSecurityProtocol(featureStoreKafkaConnectorDTO.getSecurityProtocol());
        featureStoreKafkaConnector.setOptions(this.storageConnectorUtil.fromOptions(featureStoreKafkaConnectorDTO.getOptions()));
        featureStoreKafkaConnector.setSslEndpointIdentificationAlgorithm(SSLEndpointIdentificationAlgorithm.fromString(featureStoreKafkaConnectorDTO.getSslEndpointIdentificationAlgorithm()));
        featureStoreKafkaConnector.setKeyStorePath(featureStoreKafkaConnectorDTO.getSslKeystoreLocation());
        featureStoreKafkaConnector.setTrustStorePath(featureStoreKafkaConnectorDTO.getSslTruststoreLocation());
    }

    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    @Transactional(rollbackOn = {FeaturestoreException.class})
    public FeatureStoreKafkaConnector updateConnector(Project project, Users users, Featurestore featurestore, FeatureStoreKafkaConnectorDTO featureStoreKafkaConnectorDTO, FeatureStoreKafkaConnector featureStoreKafkaConnector) throws FeaturestoreException, ProjectException, UserException {
        verifyUserInput(featureStoreKafkaConnectorDTO);
        if (featureStoreKafkaConnectorDTO.getSecurityProtocol() == SecurityProtocol.SSL) {
            verifySSLSecurityProtocolProperties(project, users, featureStoreKafkaConnectorDTO);
        }
        setGeneralAttributes(featureStoreKafkaConnectorDTO, featureStoreKafkaConnector);
        featureStoreKafkaConnector.setSslSecret(this.storageConnectorUtil.updateProjectSecret(users, featureStoreKafkaConnector.getSslSecret(), featureStoreKafkaConnector.getSslSecret().getId().getName(), featurestore, new FeatureStoreKafkaConnectorSecrets(featureStoreKafkaConnectorDTO.getSslTruststorePassword(), featureStoreKafkaConnectorDTO.getSslKeystorePassword(), featureStoreKafkaConnectorDTO.getSslKeyPassword())));
        return featureStoreKafkaConnector;
    }

    private void verifyUserInput(FeatureStoreKafkaConnectorDTO featureStoreKafkaConnectorDTO) throws FeaturestoreException {
        if (Strings.isNullOrEmpty(featureStoreKafkaConnectorDTO.getBootstrapServers())) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Bootstrap server string cannot be null or empty.");
        }
        if (featureStoreKafkaConnectorDTO.getSecurityProtocol() == null) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Kafka security protocol cannot be null or empty.");
        }
        if (featureStoreKafkaConnectorDTO.getBootstrapServers().length() > 1000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Bootstrap server string is too long, can be maximum 1000 characters.");
        }
        String fromOptions = this.storageConnectorUtil.fromOptions(featureStoreKafkaConnectorDTO.getOptions());
        if (!Strings.isNullOrEmpty(fromOptions) && fromOptions.length() > 2000) {
            throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Additional Kafka Option list is too long, please raise an Issue with the Hopsworks team.");
        }
    }

    private void verifySSLSecurityProtocolProperties(Project project, Users users, FeatureStoreKafkaConnectorDTO featureStoreKafkaConnectorDTO) throws FeaturestoreException {
        DistributedFileSystemOps distributedFileSystemOps = null;
        try {
            try {
                distributedFileSystemOps = this.dfs.getDfsOps(project, users);
                this.storageConnectorUtil.validatePath(distributedFileSystemOps, featureStoreKafkaConnectorDTO.getSslTruststoreLocation(), "Truststore location");
                this.storageConnectorUtil.validatePath(distributedFileSystemOps, featureStoreKafkaConnectorDTO.getSslKeystoreLocation(), "Keystore location");
                this.dfs.closeDfsClient(distributedFileSystemOps);
                if (Strings.isNullOrEmpty(featureStoreKafkaConnectorDTO.getSslKeyPassword())) {
                    throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.FINE, "Key password cannot be null or empty for Kafka SSL Security Policy.");
                }
            } catch (IOException e) {
                throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_ARG, Level.INFO, "Error validating keystore/truststore path", e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.dfs.closeDfsClient(distributedFileSystemOps);
            throw th;
        }
    }
}
