/*
 * Decompiled with CFR 0.152.
 */
package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.resolvers.Type;
import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.ServiceTags;
import io.hops.hopsworks.servicediscovery.tags.ZooKeeperTags;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

@Singleton
@TransactionAttribute(value=TransactionAttributeType.NOT_SUPPORTED)
public class KafkaBrokers {
    private static final Logger LOGGER = Logger.getLogger(KafkaBrokers.class.getName());
    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    private final Set<String> kafkaBrokers = new HashSet<String>();

    @PostConstruct
    @Lock(value=LockType.WRITE)
    public void setBrokerEndpoints() {
        try {
            this.kafkaBrokers.clear();
            this.kafkaBrokers.addAll(this.getBrokerEndpoints());
        }
        catch (IOException | InterruptedException | KeeperException ex) {
            LOGGER.log(Level.SEVERE, null, ex);
        }
    }

    @Lock(value=LockType.READ)
    public List<String> getBrokerEndpoints(BrokerProtocol protocol) {
        return this.kafkaBrokers.stream().filter(bi -> bi.startsWith(protocol.toString())).map(bi -> bi.replaceAll((Object)((Object)protocol) + "://", "")).collect(Collectors.toList());
    }

    @Lock(value=LockType.READ)
    public String getBrokerEndpointsString(BrokerProtocol protocol) {
        List<String> kafkaProtocolBrokers = this.getBrokerEndpoints(protocol);
        if (!kafkaProtocolBrokers.isEmpty()) {
            return StringUtils.join(kafkaProtocolBrokers, (String)",");
        }
        return null;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Set<String> getBrokerEndpoints() throws IOException, KeeperException, InterruptedException {
        try {
            String zkConnectionString = this.getZookeeperConnectionString();
            try (ZooKeeper zk = new ZooKeeper(zkConnectionString, 30000, watchedEvent -> {});){
                Set<String> set = zk.getChildren("/brokers/ids", false).stream().map(bi -> this.getBrokerInfo(zk, (String)bi)).filter(xva$0 -> StringUtils.isNoneEmpty((CharSequence[])new CharSequence[]{xva$0})).map(bi -> bi.split("[\"]")).flatMap(Arrays::stream).filter(this::isValidBrokerInfo).collect(Collectors.toSet());
                return set;
            }
        }
        catch (ServiceDiscoveryException ex) {
            throw new IOException(ex);
        }
        catch (RuntimeException ex) {
            if (ex.getCause() instanceof KeeperException) {
                throw (KeeperException)ex.getCause();
            }
            if (!(ex.getCause() instanceof InterruptedException)) throw ex;
            throw (InterruptedException)ex.getCause();
        }
    }

    private String getZookeeperConnectionString() throws ServiceDiscoveryException {
        return this.serviceDiscoveryController.getService(Type.DNS, ServiceQuery.of((String)this.serviceDiscoveryController.constructServiceFQDN(HopsworksService.ZOOKEEPER.getNameWithTag((ServiceTags)ZooKeeperTags.client)), Collections.emptySet())).map(zkServer -> zkServer.getAddress() + ":" + zkServer.getPort()).collect(Collectors.joining(","));
    }

    private String getBrokerInfo(ZooKeeper zk, String brokerId) {
        try {
            return new String(zk.getData("/brokers/ids/" + brokerId, false, null));
        }
        catch (InterruptedException | KeeperException ex) {
            LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", ex);
            throw new RuntimeException(ex);
        }
    }

    private boolean isValidBrokerInfo(String brokerInfo) {
        CharSequence[] brokerProtocolNames = (String[])Arrays.stream(BrokerProtocol.values()).map(Enum::name).toArray(String[]::new);
        return StringUtils.startsWithAny((CharSequence)brokerInfo, (CharSequence[])brokerProtocolNames) && brokerInfo.contains("//");
    }

    public static enum BrokerProtocol {
        INTERNAL,
        EXTERNAL;

    }
}

