package io.hops;

import io.hops.metadata.yarn.dal.ContainerStatusDataAccess;
import io.hops.metadata.yarn.dal.NextHeartbeatDataAccess;
import io.hops.metadata.yarn.dal.PendingEventDataAccess;
import io.hops.metadata.yarn.dal.RMNodeDataAccess;
import io.hops.metadata.yarn.dal.ResourceDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.ContainerStatus;
import io.hops.metadata.yarn.entity.NextHeartbeat;
import io.hops.metadata.yarn.entity.PendingEvent;
import io.hops.metadata.yarn.entity.Resource;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.util.DBUtility;
import io.hops.util.RMStorageFactory;
import io.hops.util.YarnAPIStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImplDist;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:io/hops/TestStreamingLibrary.class */
public class TestStreamingLibrary {
    private static final Log LOG = LogFactory.getLog(TestStreamingLibrary.class);
    private static Configuration conf;
    private static MockRM rm;
    private final int GB = 1024;
    private int id = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hops/TestStreamingLibrary$FullRMNode.class */
    public class FullRMNode {
        private final int id;
        private final RMNode yarnRMNode;
        private final io.hops.metadata.yarn.entity.RMNode rmNode;
        private final PendingEvent pendingEvent;
        private final Resource resource;
        private final NextHeartbeat nextHeartbeat;
        private final List<ContainerStatus> containerStatuses;

        public FullRMNode(RMNode rMNode, io.hops.metadata.yarn.entity.RMNode rMNode2, PendingEvent pendingEvent, Resource resource, NextHeartbeat nextHeartbeat, int i, List<ContainerStatus> list) {
            this.yarnRMNode = rMNode;
            this.rmNode = rMNode2;
            this.pendingEvent = pendingEvent;
            this.resource = resource;
            this.nextHeartbeat = nextHeartbeat;
            this.id = i;
            this.containerStatuses = list;
        }

        public List<ContainerStatus> getContainerStatuses() {
            return this.containerStatuses;
        }

        public int getId() {
            return this.id;
        }

        public RMNode getYarnRMNode() {
            return this.yarnRMNode;
        }

        public io.hops.metadata.yarn.entity.RMNode getRmNode() {
            return this.rmNode;
        }

        public PendingEvent getPendingEvent() {
            return this.pendingEvent;
        }

        public Resource getResource() {
            return this.resource;
        }

        public NextHeartbeat getNextHeartbeat() {
            return this.nextHeartbeat;
        }
    }

    /* loaded from: input_file:io/hops/TestStreamingLibrary$RMNodeWriter.class */
    private class RMNodeWriter extends LightWeightRequestHandler {
        private final FullRMNode toCommit;

        public RMNodeWriter(FullRMNode fullRMNode) {
            super(YARNOperationType.TEST);
            this.toCommit = fullRMNode;
        }

        public Object performTask() throws IOException {
            connector.beginTransaction();
            connector.writeLock();
            RMStorageFactory.getDataAccess(RMNodeDataAccess.class).add(this.toCommit.getRmNode());
            connector.flush();
            RMStorageFactory.getDataAccess(PendingEventDataAccess.class).add(this.toCommit.getPendingEvent());
            RMStorageFactory.getDataAccess(ResourceDataAccess.class).add(this.toCommit.getResource());
            RMStorageFactory.getDataAccess(NextHeartbeatDataAccess.class).update(this.toCommit.getNextHeartbeat());
            RMStorageFactory.getDataAccess(ContainerStatusDataAccess.class).addAll(this.toCommit.getContainerStatuses());
            connector.commit();
            return null;
        }
    }

    private int getId() {
        int i = this.id;
        this.id = i + 1;
        return i;
    }

    @BeforeClass
    public static void setUp() throws Exception {
        conf = new YarnConfiguration();
        conf.setBoolean("yarn.client.failover-distributed", true);
        RMStorageFactory.setConfiguration(conf);
        YarnAPIStorageFactory.setConfiguration(conf);
    }

    @Before
    public void initTests() throws Exception {
        DBUtility.InitializeDB();
        rm = new MockRM(conf);
        rm.start();
        Thread.sleep(10000L);
    }

    @After
    public void tearDown() throws Exception {
        TimeUnit.SECONDS.sleep(2L);
        rm.stop();
    }

    @Test
    @Ignore
    public void testRMReceiveEvents() throws Exception {
        LOG.debug("Register NM1");
        MockNM registerNode = rm.registerNode("host0:1234", 4096, 4, 4);
        LOG.debug("Register NM2");
        MockNM registerNode2 = rm.registerNode("host1:1234", 6144, 6, 6);
        LOG.debug("Heartbeat NM1");
        registerNode.nodeHeartbeat(true);
        LOG.debug("Heartbeat NM2");
        registerNode2.nodeHeartbeat(true);
        TimeUnit.SECONDS.sleep(2L);
        registerNode.nodeHeartbeat(true);
        registerNode2.nodeHeartbeat(true);
        TimeUnit.SECONDS.sleep(4L);
    }

    @Test
    @Ignore
    public void testAddNode() throws Exception {
        FullRMNode generateHopRMNode = generateHopRMNode(getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        new RMNodeWriter(generateHopRMNode).handle();
        LOG.debug("Persisted RMNode in DB");
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue("Node " + generateHopRMNode.getYarnRMNode().getNodeID(), rm.getRMContext().getRMNodes().containsKey(generateHopRMNode.getYarnRMNode().getNodeID()));
        Assert.assertNotNull(rm.getResourceScheduler().getNodeReport(generateHopRMNode.getYarnRMNode().getNodeID()));
        int memory = rm.getResourceScheduler().getClusterResource().getMemory();
        int virtualCores = rm.getResourceScheduler().getClusterResource().getVirtualCores();
        int gPUs = rm.getResourceScheduler().getClusterResource().getGPUs();
        int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
        Assert.assertEquals(generateHopRMNode.getYarnRMNode().getTotalCapability().getMemory(), memory);
        Assert.assertEquals(generateHopRMNode.getYarnRMNode().getTotalCapability().getVirtualCores(), virtualCores);
        Assert.assertEquals(generateHopRMNode.getYarnRMNode().getTotalCapability().getGPUs(), gPUs);
        Assert.assertEquals(1L, numClusterNodes);
    }

    @Test
    @Ignore
    public void testRemoveNode() throws Exception {
        FullRMNode generateHopRMNode = generateHopRMNode(getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        new RMNodeWriter(generateHopRMNode).handle();
        TimeUnit.SECONDS.sleep(1L);
        new RMNodeWriter(generateHopRMNode(getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW)).handle();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue("Node " + generateHopRMNode.getYarnRMNode().getNodeID(), rm.getRMContext().getRMNodes().containsKey(generateHopRMNode.getYarnRMNode().getNodeID()));
        FullRMNode generateHopRMNode2 = generateHopRMNode(generateHopRMNode.getId(), PendingEvent.Type.NODE_REMOVED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.DECOMMISSIONED);
        new RMNodeWriter(generateHopRMNode2).handle();
        TimeUnit.SECONDS.sleep(3L);
        Assert.assertNotNull(rm.getRMContext().getInactiveRMNodes().get(generateHopRMNode2.getYarnRMNode().getNodeID().getHost()));
        Assert.assertFalse(rm.getRMContext().getRMNodes().containsKey(generateHopRMNode2.getYarnRMNode().getNodeID()));
        int memory = rm.getResourceScheduler().getClusterResource().getMemory();
        int virtualCores = rm.getResourceScheduler().getClusterResource().getVirtualCores();
        int gPUs = rm.getResourceScheduler().getClusterResource().getGPUs();
        int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
        Assert.assertNull(rm.getResourceScheduler().getNodeReport(generateHopRMNode2.getYarnRMNode().getNodeID()));
        Assert.assertEquals(r0.getYarnRMNode().getTotalCapability().getMemory(), memory);
        Assert.assertEquals(r0.getYarnRMNode().getTotalCapability().getVirtualCores(), virtualCores);
        Assert.assertEquals(r0.getYarnRMNode().getTotalCapability().getGPUs(), gPUs);
        Assert.assertEquals(1L, numClusterNodes);
    }

    @Test
    @Ignore
    public void testUpdateNode() throws Exception {
        FullRMNode generateHopRMNode = generateHopRMNode(getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        new RMNodeWriter(generateHopRMNode).handle();
        TimeUnit.SECONDS.sleep(1L);
        new RMNodeWriter(generateHopRMNode(getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW)).handle();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue("Node " + generateHopRMNode.getYarnRMNode().getNodeID(), rm.getRMContext().getRMNodes().containsKey(generateHopRMNode.getYarnRMNode().getNodeID()));
    }

    private FullRMNode generateHopRMNode(int i, PendingEvent.Type type, PendingEvent.Status status, NodeState nodeState) {
        RMNodeImplDist rMNodeImplDist = new RMNodeImplDist(NodeId.newInstance("host" + i, 1234), rm.getRMContext(), "host" + i, 1337, 8080, new NodeBase("name", "/location"), org.apache.hadoop.yarn.api.records.Resource.newInstance(6144, 6, 6), "1.0");
        if (!NodeState.NEW.equals(nodeState)) {
            rMNodeImplDist.setState(nodeState.name());
        }
        io.hops.metadata.yarn.entity.RMNode rMNode = new io.hops.metadata.yarn.entity.RMNode(rMNodeImplDist.getNodeID().toString(), rMNodeImplDist.getHostName(), rMNodeImplDist.getCommandPort(), rMNodeImplDist.getHttpPort(), rMNodeImplDist.getHealthReport(), rMNodeImplDist.getLastHealthReportTime(), rMNodeImplDist.getState().name(), rMNodeImplDist.getNodeManagerVersion(), i);
        Resource resource = new Resource(rMNodeImplDist.getNodeID().toString(), rMNodeImplDist.getTotalCapability().getMemory(), rMNodeImplDist.getTotalCapability().getVirtualCores(), rMNodeImplDist.getTotalCapability().getGPUs(), i);
        int i2 = 0 + 1 + 1;
        NextHeartbeat nextHeartbeat = new NextHeartbeat(rMNodeImplDist.getNodeID().toString(), true);
        PendingEvent pendingEvent = new PendingEvent(rMNodeImplDist.getNodeID().toString(), type, status, i, i2);
        List<ContainerStatus> generateContainerStatuses = generateContainerStatuses(1, rMNodeImplDist.getNodeID().toString(), pendingEvent.getId().getEventId().intValue());
        pendingEvent.setContains(i2 + 1 + 1);
        return new FullRMNode(rMNodeImplDist, rMNode, pendingEvent, resource, nextHeartbeat, i, generateContainerStatuses);
    }

    private List<ContainerStatus> generateContainerStatuses(int i, String str, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(new ContainerStatus(str + "_" + i3, "RUNNING", "HEALTHY", 1, str, i2, i2));
        }
        return arrayList;
    }
}
