package org.apache.hadoop.hive.llap.registry.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase;
import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hive.com.google.common.collect.Sets;
import org.apache.hive.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.hive.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.hive.org.apache.curator.utils.CloseableUtils;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.URIUtil;

/* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.class */
public class LlapZookeeperRegistryImpl extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry<LlapServiceInstance> {
    private static final Logger LOG;
    private static final String IPC_SERVICES = "services";
    private static final String IPC_MNG = "llapmng";
    private static final String IPC_SHUFFLE = "shuffle";
    private static final String IPC_LLAP = "llap";
    private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
    private static final String NAMESPACE_PREFIX = "llap-";
    private static final String SLOT_PREFIX = "slot-";
    private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
    private SlotZnode slotZnode;
    private DynamicServiceInstanceSet instances;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl$DynamicServiceInstance.class */
    public class DynamicServiceInstance extends ServiceInstanceBase implements LlapServiceInstance {
        private final int mngPort;
        private final int shufflePort;
        private final int outputFormatPort;
        private final String serviceAddress;
        private final Resource resource;

        public DynamicServiceInstance(ServiceRecord serviceRecord) throws IOException {
            super(serviceRecord, LlapZookeeperRegistryImpl.IPC_LLAP);
            Endpoint internalEndpoint = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_SHUFFLE);
            Endpoint internalEndpoint2 = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_MNG);
            Endpoint internalEndpoint3 = serviceRecord.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_OUTPUTFORMAT);
            Endpoint externalEndpoint = serviceRecord.getExternalEndpoint(LlapZookeeperRegistryImpl.IPC_SERVICES);
            this.mngPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map) internalEndpoint2.addresses.get(0), "port"));
            this.shufflePort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map) internalEndpoint.addresses.get(0), "port"));
            this.outputFormatPort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map) internalEndpoint3.addresses.get(0), "port")).intValue();
            this.serviceAddress = RegistryTypeUtils.getAddressField((Map) externalEndpoint.addresses.get(0), "uri");
            String str = serviceRecord.get(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, "");
            String str2 = serviceRecord.get(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, "");
            try {
                this.resource = Resource.newInstance(Integer.parseInt(str), Integer.parseInt(str2));
            } catch (NumberFormatException e) {
                throw new IOException("Invalid resource configuration for a LLAP node: memory " + str + ", vcores " + str2);
            }
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstance
        public int getShufflePort() {
            return this.shufflePort;
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstance
        public String getServicesAddress() {
            return this.serviceAddress;
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstance
        public Resource getResource() {
            return this.resource;
        }

        @Override // org.apache.hadoop.hive.registry.impl.ServiceInstanceBase
        public String toString() {
            return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() + " with resources=" + getResource() + ", shufflePort=" + getShufflePort() + ", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]";
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstance
        public int getManagementPort() {
            return this.mngPort;
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstance
        public int getOutputFormatPort() {
            return this.outputFormatPort;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl$DynamicServiceInstanceSet.class */
    public static class DynamicServiceInstanceSet implements LlapServiceInstanceSet {
        private final PathChildrenCache instancesCache;
        private final LlapZookeeperRegistryImpl parent;
        private final RegistryUtils.ServiceRecordMarshal encoder;

        public DynamicServiceInstanceSet(PathChildrenCache pathChildrenCache, LlapZookeeperRegistryImpl llapZookeeperRegistryImpl, RegistryUtils.ServiceRecordMarshal serviceRecordMarshal) {
            this.instancesCache = pathChildrenCache;
            this.parent = llapZookeeperRegistryImpl;
            this.encoder = serviceRecordMarshal;
            llapZookeeperRegistryImpl.populateCache(this.instancesCache, false);
        }

        @Override // org.apache.hadoop.hive.registry.ServiceInstanceSet
        public Collection<LlapServiceInstance> getAll() {
            return this.parent.getAllInternal();
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet
        public Collection<LlapServiceInstance> getAllInstancesOrdered(boolean z) {
            return this.parent.getAllInstancesOrdered(z, this.instancesCache);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hadoop.hive.registry.ServiceInstanceSet
        public LlapServiceInstance getInstance(String str) {
            for (LlapServiceInstance llapServiceInstance : getAll()) {
                if (llapServiceInstance.getWorkerIdentity().equals(str)) {
                    return llapServiceInstance;
                }
            }
            return null;
        }

        @Override // org.apache.hadoop.hive.registry.ServiceInstanceSet
        public Set<LlapServiceInstance> getByHost(String str) {
            return this.parent.getByHostInternal(str);
        }

        @Override // org.apache.hadoop.hive.registry.ServiceInstanceSet
        public int size() {
            return this.parent.sizeInternal();
        }

        @Override // org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet
        public ApplicationId getApplicationId() {
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] workerData = LlapZookeeperRegistryImpl.getWorkerData(childData, "worker-");
                if (workerData != null) {
                    try {
                        String str = ((ServiceRecord) this.encoder.fromBytes(childData.getPath(), workerData)).get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
                        if (str != null && !str.isEmpty()) {
                            return ContainerId.fromString(str).getApplicationAttemptId().getApplicationId();
                        }
                    } catch (IOException e) {
                        LlapZookeeperRegistryImpl.LOG.error("Unable to decode data for zkpath: {}. Ignoring from current instances list..", childData.getPath());
                    }
                }
            }
            return null;
        }
    }

    public LlapZookeeperRegistryImpl(String str, Configuration configuration) {
        super(str, configuration, HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, "user-", "worker-", "workers", LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL), HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_KERBEROS_KEYTAB_FILE), HiveConf.ConfVars.LLAP_VALIDATE_ACLS);
        LOG.info("Llap Zookeeper Registry is enabled with registryid: " + str);
    }

    public Endpoint getRpcEndpoint() {
        return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT)));
    }

    public Endpoint getShuffleEndpoint() {
        return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, "tcp", hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT));
    }

    public Endpoint getServicesEndpoint() {
        try {
            return RegistryTypeUtils.webEndpoint(IPC_SERVICES, new URI[]{new URL(HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_SSL) ? URIUtil.HTTPS : "http", hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_PORT), "").toURI()});
        } catch (MalformedURLException e) {
            throw new RuntimeException(e);
        } catch (URISyntaxException e2) {
            throw new RuntimeException("llap service URI for " + hostname + " is invalid", e2);
        }
    }

    public Endpoint getMngEndpoint() {
        return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
    }

    public Endpoint getOutputFormatEndpoint() {
        return RegistryTypeUtils.ipcEndpoint(IPC_OUTPUTFORMAT, new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)));
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public String register() throws IOException {
        ServiceRecord serviceRecord = new ServiceRecord();
        Endpoint rpcEndpoint = getRpcEndpoint();
        serviceRecord.addInternalEndpoint(rpcEndpoint);
        serviceRecord.addInternalEndpoint(getMngEndpoint());
        serviceRecord.addInternalEndpoint(getShuffleEndpoint());
        serviceRecord.addExternalEndpoint(getServicesEndpoint());
        serviceRecord.addInternalEndpoint(getOutputFormatEndpoint());
        Iterator it2 = this.conf.iterator();
        while (it2.hasNext()) {
            Map.Entry entry = (Map.Entry) it2.next();
            if (((String) entry.getKey()).startsWith(HiveConf.PREFIX_LLAP) || ((String) entry.getKey()).startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
                serviceRecord.set((String) entry.getKey(), entry.getValue());
            }
        }
        String registerServiceRecord = registerServiceRecord(serviceRecord);
        try {
            this.slotZnode = new SlotZnode(this.zooKeeperClient, this.workersPath, SLOT_PREFIX, "worker-", registerServiceRecord);
            if (!this.slotZnode.start(120L, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: 120s exhausted");
            }
            LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}, webui: {}, mgmt: {}, znodePath: {}", rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), getRegistrationZnodePath());
            return registerServiceRecord;
        } catch (Exception e) {
            LOG.error("Unable to create a znode for this server instance", (Throwable) e);
            CloseableUtils.closeQuietly(this.slotZnode);
            super.stop();
            if (e instanceof IOException) {
                throw ((IOException) e);
            }
            throw new IOException(e);
        }
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public void unregister() throws IOException {
    }

    private static String extractWorkerIdFromSlot(ChildData childData) {
        return new String(childData.getData(), SlotZnode.CHARSET);
    }

    Collection<LlapServiceInstance> getAllInstancesOrdered(boolean z, PathChildrenCache pathChildrenCache) {
        HashMap hashMap = new HashMap();
        HashSet<LlapServiceInstance> newHashSet = Sets.newHashSet();
        for (ChildData childData : pathChildrenCache.getCurrentData()) {
            if (childData != null && childData.getData() != null) {
                String extractNodeName = extractNodeName(childData);
                if (extractNodeName.startsWith("worker-")) {
                    LlapServiceInstance instanceByPath = getInstanceByPath(childData.getPath());
                    if (instanceByPath != null) {
                        newHashSet.add(instanceByPath);
                    }
                } else if (extractNodeName.startsWith(SLOT_PREFIX)) {
                    hashMap.put(extractWorkerIdFromSlot(childData), Long.valueOf(Long.parseLong(extractNodeName.substring(SLOT_PREFIX.length()))));
                } else {
                    LOG.info("Ignoring unknown node {}", childData.getPath());
                }
            }
        }
        TreeMap treeMap = new TreeMap();
        long j = Long.MIN_VALUE;
        for (LlapServiceInstance llapServiceInstance : newHashSet) {
            Long l = (Long) hashMap.get(llapServiceInstance.getWorkerIdentity());
            if (l == null) {
                LOG.info("Unknown slot for {}", llapServiceInstance.getWorkerIdentity());
            } else {
                j = Math.max(j, l.longValue());
                treeMap.put(l, llapServiceInstance);
            }
        }
        if (z) {
            TreeMap treeMap2 = new TreeMap();
            long j2 = 0;
            Long l2 = null;
            for (Long l3 : treeMap.keySet()) {
                if (!$assertionsDisabled && l3.longValue() < j2) {
                    throw new AssertionError();
                }
                while (l3.longValue() > j2) {
                    if (l2 == null) {
                        l2 = Long.valueOf(System.nanoTime());
                    }
                    treeMap2.put(Long.valueOf(j2), new InactiveServiceInstance("inactive-" + j2 + "-" + l2));
                    j2++;
                }
                j2++;
            }
            treeMap.putAll(treeMap2);
        }
        return treeMap.values();
    }

    private static String extractNodeName(ChildData childData) {
        String path = childData.getPath();
        int lastIndexOf = path.lastIndexOf("/");
        if (lastIndexOf >= 0) {
            path = path.substring(lastIndexOf + 1);
        }
        return path;
    }

    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    /* renamed from: getInstances, reason: merged with bridge method [inline-methods] */
    public ServiceInstanceSet<LlapServiceInstance> getInstances2(String str, long j) throws IOException {
        PathChildrenCache ensureInstancesCache = ensureInstancesCache(j);
        if (this.instances == null) {
            this.instances = new DynamicServiceInstanceSet(ensureInstancesCache, this, this.encoder);
        }
        return this.instances;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet] */
    @Override // org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public ApplicationId getApplicationId() throws IOException {
        return getInstances2("LLAP", 0L).getApplicationId();
    }

    @Override // org.apache.hadoop.hive.registry.impl.ZkRegistryBase, org.apache.hadoop.hive.llap.registry.ServiceRegistry
    public void stop() {
        CloseableUtils.closeQuietly(this.slotZnode);
        super.stop();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hadoop.hive.registry.impl.ZkRegistryBase
    public LlapServiceInstance createServiceInstance(ServiceRecord serviceRecord) throws IOException {
        return new DynamicServiceInstance(serviceRecord);
    }

    @Override // org.apache.hadoop.hive.registry.impl.ZkRegistryBase
    protected String getZkPathUser(Configuration configuration) {
        return HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
    }

    static {
        $assertionsDisabled = !LlapZookeeperRegistryImpl.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) LlapZookeeperRegistryImpl.class);
    }
}
