/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.dao.kafka;

import com.google.common.base.Strings;
import io.hops.hopsworks.common.featurestore.OptionDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.kafka.FeatureStoreKafkaConnectorDTO;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.security.BaseHadoopClientsService;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.KafkaTags;
import io.hops.hopsworks.servicediscovery.tags.ServiceTags;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;

@Stateless
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
@ConcurrencyManagement(value=ConcurrencyManagementType.CONTAINER)
public class HopsKafkaAdminClient {
    private static final Logger LOGGER = Logger.getLogger(HopsKafkaAdminClient.class.getName());
    @EJB
    protected ServiceDiscoveryController serviceDiscoveryController;
    @EJB
    protected BaseHadoopClientsService baseHadoopService;
    @EJB
    protected DistributedFsService dfs;

    public Properties getHopsworksKafkaProperties() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", this.serviceDiscoveryController.constructServiceFQDNWithPort(HopsworksService.KAFKA.getNameWithTag((ServiceTags)KafkaTags.broker)));
            props.setProperty("security.protocol", "SSL");
            props.setProperty("ssl.truststore.location", this.baseHadoopService.getSuperTrustStorePath());
            props.setProperty("ssl.truststore.password", this.baseHadoopService.getSuperTrustStorePassword());
            props.setProperty("ssl.keystore.location", this.baseHadoopService.getSuperKeystorePath());
            props.setProperty("ssl.keystore.password", this.baseHadoopService.getSuperKeystorePassword());
            props.setProperty("ssl.key.password", this.baseHadoopService.getSuperKeystorePassword());
            props.setProperty("ssl.endpoint.identification.algorithm", "");
            return props;
        }
        catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Failed to construct Kafka properties", e);
            return new Properties();
        }
    }

    protected Properties getProjectKafkaProperties(FeatureStoreKafkaConnectorDTO connector) {
        if (Boolean.FALSE.equals(connector.isExternalKafka())) {
            return this.getHopsworksKafkaProperties();
        }
        try {
            Properties props = new Properties();
            for (OptionDTO option : connector.getOptions()) {
                props.setProperty(option.getName(), option.getValue());
            }
            props.setProperty("bootstrap.servers", connector.getBootstrapServers());
            props.setProperty("security.protocol", connector.getSecurityProtocol().name());
            props.setProperty("ssl.endpoint.identification.algorithm", connector.getSslEndpointIdentificationAlgorithm());
            this.setPasswordProperty(props, "ssl.truststore.password", connector);
            this.setPasswordProperty(props, "ssl.keystore.password", connector);
            this.setPasswordProperty(props, "ssl.key.password", connector);
            this.setCertificates(props, connector);
            return props;
        }
        catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Failed to construct Kafka properties", e);
            return new Properties();
        }
    }

    private void setPasswordProperty(Properties props, String key, FeatureStoreKafkaConnectorDTO connector) {
        String value;
        switch (key) {
            case "ssl.truststore.password": {
                value = connector.getSslTruststorePassword();
                break;
            }
            case "ssl.keystore.password": {
                value = connector.getSslKeystorePassword();
                break;
            }
            case "ssl.key.password": {
                value = connector.getSslKeyPassword();
                break;
            }
            default: {
                return;
            }
        }
        if (!Strings.isNullOrEmpty((String)value)) {
            props.setProperty(key, value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setCertificates(Properties props, FeatureStoreKafkaConnectorDTO connector) throws IOException {
        DistributedFileSystemOps dfso = null;
        try {
            dfso = this.dfs.getDfsOps();
            this.setCertificateProperty(props, "ssl.truststore.location", connector, dfso);
            this.setCertificateProperty(props, "ssl.keystore.location", connector, dfso);
        }
        finally {
            if (dfso != null) {
                this.dfs.closeDfsClient(dfso);
            }
        }
    }

    private void setCertificateProperty(Properties props, String key, FeatureStoreKafkaConnectorDTO connector, DistributedFileSystemOps dfso) throws IOException {
        String localLocation;
        String location;
        String pathPattern = "/tmp/kafka_sc_%s%s";
        switch (key) {
            case "ssl.truststore.location": {
                location = connector.getSslTruststoreLocation();
                localLocation = String.format(pathPattern, connector.getId(), "__tstore.jks");
                break;
            }
            case "ssl.keystore.location": {
                location = connector.getSslKeystoreLocation();
                localLocation = String.format(pathPattern, connector.getId(), "__kstore.jks");
                break;
            }
            default: {
                return;
            }
        }
        if (!Strings.isNullOrEmpty((String)location)) {
            dfso.copyToLocal(location, localLocation);
            props.setProperty(key, localLocation);
        }
    }

    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
        try (AdminClient adminClient = AdminClient.create((Properties)this.getHopsworksKafkaProperties());){
            CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
            return createTopicsResult;
        }
    }

    public DeleteTopicsResult deleteTopics(Collection<String> topics) {
        try (AdminClient adminClient = AdminClient.create((Properties)this.getHopsworksKafkaProperties());){
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topics);
            return deleteTopicsResult;
        }
    }

    public ListTopicsResult listTopics() {
        try (AdminClient adminClient = AdminClient.create((Properties)this.getHopsworksKafkaProperties());){
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            return listTopicsResult;
        }
    }

    public DescribeTopicsResult describeTopics(FeatureStoreKafkaConnectorDTO connector, Collection<String> topics) {
        try (AdminClient adminClient = AdminClient.create((Properties)this.getProjectKafkaProperties(connector));){
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
            return describeTopicsResult;
        }
    }

    public Set<String> getBrokerEndpoints() {
        HashSet<String> kafkaBrokers = new HashSet<String>();
        try (AdminClient adminClient = AdminClient.create((Properties)this.getHopsworksKafkaProperties());){
            Collection clusterDetails = (Collection)adminClient.describeCluster().nodes().get(5L, TimeUnit.SECONDS);
            for (Node node : clusterDetails) {
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
                DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singleton(configResource));
                Map configMap = (Map)describeConfigsResult.all().get();
                Config config = (Config)configMap.get(configResource);
                String advertisedListeners = config.get("advertised.listeners").value();
                kafkaBrokers.addAll(Arrays.asList(advertisedListeners.split(",")));
            }
        }
        catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", e);
        }
        return kafkaBrokers;
    }
}

