/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.resolvers.Type;
import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.ServiceTags;
import io.hops.hopsworks.servicediscovery.tags.ZooKeeperTags;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

@Singleton
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class KafkaBrokers {
    private static final Logger LOGGER = Logger.getLogger(KafkaBrokers.class.getName());
    public static final String KAFKA_BROKER_PROTOCOL_INTERNAL = "INTERNAL";
    public static final String KAFKA_BROKER_PROTOCOL_EXTERNAL = "EXTERNAL";
    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    private final Set<String> internalKafkaBrokers = new HashSet<String>();

    @PostConstruct
    private void init() {
        try {
            this.setInternalKafkaBrokers(this.getBrokerEndpoints(KAFKA_BROKER_PROTOCOL_INTERNAL));
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, null, ex);
        }
    }

    public void setInternalKafkaBrokers(Set<String> internalKafkaBrokers) {
        this.internalKafkaBrokers.clear();
        this.internalKafkaBrokers.addAll(internalKafkaBrokers);
    }

    @Lock(value=LockType.READ)
    public String getKafkaBrokersString() {
        if (!this.internalKafkaBrokers.isEmpty()) {
            return StringUtils.join(this.internalKafkaBrokers, (String)",");
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Lock(value=LockType.READ)
    public Set<String> getBrokerEndpoints(String protocol) throws IOException, KeeperException, InterruptedException {
        Set<String> set;
        String zkConnectionString = this.getZookeeperConnectionString();
        ZooKeeper zk = new ZooKeeper(zkConnectionString, 30000, watchedEvent -> {});
        try {
            set = zk.getChildren("/brokers/ids", false).stream().map(bi -> this.getBrokerInfo(zk, (String)bi)).filter(xva$0 -> StringUtils.isNoneEmpty((CharSequence[])new CharSequence[]{xva$0})).map(bi -> bi.split("[\"]")).flatMap(Arrays::stream).filter(bi -> this.isValidBrokerInfo((String)bi, protocol)).collect(Collectors.toSet());
        }
        catch (Throwable throwable) {
            try {
                zk.close();
                throw throwable;
            }
            catch (ServiceDiscoveryException ex) {
                throw new IOException(ex);
            }
            catch (RuntimeException ex) {
                if (ex.getCause() instanceof KeeperException) {
                    throw (KeeperException)ex.getCause();
                }
                if (ex.getCause() instanceof InterruptedException) {
                    throw (InterruptedException)ex.getCause();
                }
                throw ex;
            }
        }
        zk.close();
        return set;
    }

    public String getZookeeperConnectionString() throws ServiceDiscoveryException {
        return this.serviceDiscoveryController.getService(Type.DNS, ServiceQuery.of((String)this.serviceDiscoveryController.constructServiceFQDN(HopsworksService.ZOOKEEPER.getNameWithTag((ServiceTags)ZooKeeperTags.client)), Collections.emptySet())).map(zkServer -> zkServer.getAddress() + ":" + zkServer.getPort()).collect(Collectors.joining(","));
    }

    private String getBrokerInfo(ZooKeeper zk, String brokerId) {
        try {
            return new String(zk.getData("/brokers/ids/" + brokerId, false, null));
        }
        catch (InterruptedException | KeeperException ex) {
            LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", ex);
            throw new RuntimeException(ex);
        }
    }

    private boolean isValidBrokerInfo(String brokerInfo, String protocol) {
        return brokerInfo.startsWith(protocol) && brokerInfo.contains("//");
    }
}

