package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.resolvers.Type;
import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

@Singleton
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
/* loaded from: input_file:io/hops/hopsworks/common/kafka/KafkaBrokers.class */
public class KafkaBrokers {
    private static final Logger LOGGER = Logger.getLogger(KafkaBrokers.class.getName());
    private static final String KAFKA_BROKER_PROTOCOL = "INTERNAL";

    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    private final Set<String> kafkaBrokers = new HashSet();

    @PostConstruct
    private void init() {
        try {
            setKafkaBrokers(getBrokerEndpoints());
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }

    public void setKafkaBrokers(Set<String> set) {
        this.kafkaBrokers.clear();
        this.kafkaBrokers.addAll(set);
    }

    @Lock(LockType.READ)
    public Set<String> getKafkaBrokers() {
        return this.kafkaBrokers;
    }

    @Lock(LockType.READ)
    public String getKafkaBrokersString() {
        if (this.kafkaBrokers.isEmpty()) {
            return null;
        }
        return StringUtils.join(this.kafkaBrokers, ",");
    }

    @Lock(LockType.READ)
    public Optional<String> getAnyKafkaBroker() {
        return this.kafkaBrokers.stream().filter(charSequence -> {
            return StringUtils.isNoneEmpty(new CharSequence[]{charSequence});
        }).findAny();
    }

    @Lock(LockType.READ)
    public Set<String> getBrokerEndpoints() throws IOException, KeeperException, InterruptedException {
        try {
            ZooKeeper zooKeeper = new ZooKeeper(getZookeeperConnectionString(), Settings.ZOOKEEPER_SESSION_TIMEOUT_MS, new Watcher() { // from class: io.hops.hopsworks.common.kafka.KafkaBrokers.1
                public void process(WatchedEvent watchedEvent) {
                }
            });
            try {
                Set<String> set = (Set) zooKeeper.getChildren("/brokers/ids", false).stream().map(str -> {
                    return getBrokerInfo(zooKeeper, str);
                }).filter(charSequence -> {
                    return StringUtils.isNoneEmpty(new CharSequence[]{charSequence});
                }).map(str2 -> {
                    return str2.split(KafkaConst.DLIMITER);
                }).flatMap((v0) -> {
                    return Arrays.stream(v0);
                }).filter(this::isValidBrokerInfo).collect(Collectors.toSet());
                zooKeeper.close();
                return set;
            } catch (Throwable th) {
                zooKeeper.close();
                throw th;
            }
        } catch (RuntimeException e) {
            if (e.getCause() instanceof KeeperException) {
                throw e.getCause();
            }
            if (e.getCause() instanceof InterruptedException) {
                throw ((InterruptedException) e.getCause());
            }
            throw e;
        } catch (ServiceDiscoveryException e2) {
            throw new IOException((Throwable) e2);
        }
    }

    public String getZookeeperConnectionString() throws ServiceDiscoveryException {
        return (String) this.serviceDiscoveryController.getService(Type.DNS, ServiceQuery.of(this.serviceDiscoveryController.constructServiceFQDN(ServiceDiscoveryController.HopsworksService.ZOOKEEPER_CLIENT), Collections.emptySet())).map(service -> {
            return service.getAddress() + KafkaConst.COLON_SEPARATOR + service.getPort();
        }).collect(Collectors.joining(","));
    }

    private String getBrokerInfo(ZooKeeper zooKeeper, String str) {
        try {
            return new String(zooKeeper.getData("/brokers/ids/" + str, false, (Stat) null));
        } catch (KeeperException | InterruptedException e) {
            LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", e);
            throw new RuntimeException(e);
        }
    }

    private boolean isValidBrokerInfo(String str) {
        return str.startsWith(KAFKA_BROKER_PROTOCOL) && str.contains(KafkaConst.SLASH_SEPARATOR);
    }
}
