package io.hops.hopsworks.common.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.resolvers.Type;
import com.logicalclocks.servicediscoverclient.service.ServiceQuery;
import io.hops.hopsworks.common.dao.kafka.KafkaConst;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.ZooKeeperTags;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

@Singleton
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
/* loaded from: input_file:io/hops/hopsworks/common/kafka/KafkaBrokers.class */
public class KafkaBrokers {
    private static final Logger LOGGER = Logger.getLogger(KafkaBrokers.class.getName());
    public static final String KAFKA_BROKER_PROTOCOL_INTERNAL = "INTERNAL";
    public static final String KAFKA_BROKER_PROTOCOL_EXTERNAL = "EXTERNAL";

    @EJB
    private ServiceDiscoveryController serviceDiscoveryController;
    private final Set<String> internalKafkaBrokers = new HashSet();

    @PostConstruct
    private void init() {
        try {
            setInternalKafkaBrokers(getBrokerEndpoints(KAFKA_BROKER_PROTOCOL_INTERNAL));
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }

    public void setInternalKafkaBrokers(Set<String> set) {
        this.internalKafkaBrokers.clear();
        this.internalKafkaBrokers.addAll(set);
    }

    @Lock(LockType.READ)
    public String getKafkaBrokersString() {
        if (this.internalKafkaBrokers.isEmpty()) {
            return null;
        }
        return StringUtils.join(this.internalKafkaBrokers, ",");
    }

    @Lock(LockType.READ)
    public Set<String> getBrokerEndpoints(String str) throws IOException, KeeperException, InterruptedException {
        try {
            ZooKeeper zooKeeper = new ZooKeeper(getZookeeperConnectionString(), Settings.ZOOKEEPER_SESSION_TIMEOUT_MS, watchedEvent -> {
            });
            try {
                Set<String> set = (Set) zooKeeper.getChildren("/brokers/ids", false).stream().map(str2 -> {
                    return getBrokerInfo(zooKeeper, str2);
                }).filter(charSequence -> {
                    return StringUtils.isNoneEmpty(new CharSequence[]{charSequence});
                }).map(str3 -> {
                    return str3.split(KafkaConst.DLIMITER);
                }).flatMap((v0) -> {
                    return Arrays.stream(v0);
                }).filter(str4 -> {
                    return isValidBrokerInfo(str4, str);
                }).collect(Collectors.toSet());
                zooKeeper.close();
                return set;
            } catch (Throwable th) {
                zooKeeper.close();
                throw th;
            }
        } catch (RuntimeException e) {
            if (e.getCause() instanceof KeeperException) {
                throw e.getCause();
            }
            if (e.getCause() instanceof InterruptedException) {
                throw ((InterruptedException) e.getCause());
            }
            throw e;
        } catch (ServiceDiscoveryException e2) {
            throw new IOException((Throwable) e2);
        }
    }

    public String getZookeeperConnectionString() throws ServiceDiscoveryException {
        return (String) this.serviceDiscoveryController.getService(Type.DNS, ServiceQuery.of(this.serviceDiscoveryController.constructServiceFQDN(HopsworksService.ZOOKEEPER.getNameWithTag(ZooKeeperTags.client)), Collections.emptySet())).map(service -> {
            return service.getAddress() + KafkaConst.COLON_SEPARATOR + service.getPort();
        }).collect(Collectors.joining(","));
    }

    private String getBrokerInfo(ZooKeeper zooKeeper, String str) {
        try {
            return new String(zooKeeper.getData("/brokers/ids/" + str, false, (Stat) null));
        } catch (KeeperException | InterruptedException e) {
            LOGGER.log(Level.SEVERE, "Could not get Kafka broker information", e);
            throw new RuntimeException(e);
        }
    }

    private boolean isValidBrokerInfo(String str, String str2) {
        return str.startsWith(str2) && str.contains(KafkaConst.SLASH_SEPARATOR);
    }
}
