/*
 * Decompiled with CFR 0.152.
 */
package io.hops;

import io.hops.metadata.yarn.dal.ContainerStatusDataAccess;
import io.hops.metadata.yarn.dal.NextHeartbeatDataAccess;
import io.hops.metadata.yarn.dal.PendingEventDataAccess;
import io.hops.metadata.yarn.dal.RMNodeDataAccess;
import io.hops.metadata.yarn.dal.ResourceDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.ContainerStatus;
import io.hops.metadata.yarn.entity.NextHeartbeat;
import io.hops.metadata.yarn.entity.PendingEvent;
import io.hops.metadata.yarn.entity.RMNode;
import io.hops.metadata.yarn.entity.Resource;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.DBUtility;
import io.hops.util.RMStorageFactory;
import io.hops.util.YarnAPIStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImplDist;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

public class TestStreamingLibrary {
    private static final Log LOG = LogFactory.getLog(TestStreamingLibrary.class);
    private static Configuration conf;
    private static MockRM rm;
    private final int GB = 1024;
    private int id = 0;

    private int getId() {
        return this.id++;
    }

    @BeforeClass
    public static void setUp() throws Exception {
        conf = new YarnConfiguration();
        conf.setBoolean("yarn.client.failover-distributed", true);
        RMStorageFactory.setConfiguration((Configuration)conf);
        YarnAPIStorageFactory.setConfiguration((Configuration)conf);
    }

    @Before
    public void initTests() throws Exception {
        DBUtility.InitializeDB();
        rm = new MockRM(conf);
        rm.start();
        Thread.sleep(10000L);
    }

    @After
    public void tearDown() throws Exception {
        TimeUnit.SECONDS.sleep(2L);
        rm.stop();
    }

    @Test
    @Ignore
    public void testRMReceiveEvents() throws Exception {
        LOG.debug((Object)"Register NM1");
        MockNM nm1 = rm.registerNode("host0:1234", 4096, 4, 4);
        LOG.debug((Object)"Register NM2");
        MockNM nm2 = rm.registerNode("host1:1234", 6144, 6, 6);
        LOG.debug((Object)"Heartbeat NM1");
        nm1.nodeHeartbeat(true);
        LOG.debug((Object)"Heartbeat NM2");
        nm2.nodeHeartbeat(true);
        TimeUnit.SECONDS.sleep(2L);
        nm1.nodeHeartbeat(true);
        nm2.nodeHeartbeat(true);
        TimeUnit.SECONDS.sleep(4L);
    }

    @Ignore
    @Test
    public void testAddNode() throws Exception {
        FullRMNode toCommit = this.generateHopRMNode(this.getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        RMNodeWriter rmNodeWriter = new RMNodeWriter(toCommit);
        rmNodeWriter.handle();
        LOG.debug((Object)"Persisted RMNode in DB");
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue((String)("Node " + toCommit.getYarnRMNode().getNodeID()), (boolean)rm.getRMContext().getRMNodes().containsKey(toCommit.getYarnRMNode().getNodeID()));
        Assert.assertNotNull((Object)rm.getResourceScheduler().getNodeReport(toCommit.getYarnRMNode().getNodeID()));
        int clusterMemory = rm.getResourceScheduler().getClusterResource().getMemory();
        int clusterVCores = rm.getResourceScheduler().getClusterResource().getVirtualCores();
        int clusterGPUs = rm.getResourceScheduler().getClusterResource().getGPUs();
        int numOfNodes = rm.getResourceScheduler().getNumClusterNodes();
        Assert.assertEquals((long)toCommit.getYarnRMNode().getTotalCapability().getMemory(), (long)clusterMemory);
        Assert.assertEquals((long)toCommit.getYarnRMNode().getTotalCapability().getVirtualCores(), (long)clusterVCores);
        Assert.assertEquals((long)toCommit.getYarnRMNode().getTotalCapability().getGPUs(), (long)clusterGPUs);
        Assert.assertEquals((long)1L, (long)numOfNodes);
    }

    @Ignore
    @Test
    public void testRemoveNode() throws Exception {
        FullRMNode addedNode = this.generateHopRMNode(this.getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        RMNodeWriter rmNodeWriter = new RMNodeWriter(addedNode);
        rmNodeWriter.handle();
        TimeUnit.SECONDS.sleep(1L);
        FullRMNode dummyNode = this.generateHopRMNode(this.getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        rmNodeWriter = new RMNodeWriter(dummyNode);
        rmNodeWriter.handle();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue((String)("Node " + addedNode.getYarnRMNode().getNodeID()), (boolean)rm.getRMContext().getRMNodes().containsKey(addedNode.getYarnRMNode().getNodeID()));
        FullRMNode decNode = this.generateHopRMNode(addedNode.getId(), PendingEvent.Type.NODE_REMOVED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.DECOMMISSIONED);
        rmNodeWriter = new RMNodeWriter(decNode);
        rmNodeWriter.handle();
        TimeUnit.SECONDS.sleep(3L);
        Assert.assertNotNull(rm.getRMContext().getInactiveRMNodes().get(decNode.getYarnRMNode().getNodeID().getHost()));
        Assert.assertFalse((boolean)rm.getRMContext().getRMNodes().containsKey(decNode.getYarnRMNode().getNodeID()));
        int clusterMemory = rm.getResourceScheduler().getClusterResource().getMemory();
        int clusterVCores = rm.getResourceScheduler().getClusterResource().getVirtualCores();
        int clusterGPUs = rm.getResourceScheduler().getClusterResource().getGPUs();
        int numOfNodes = rm.getResourceScheduler().getNumClusterNodes();
        Assert.assertNull((Object)rm.getResourceScheduler().getNodeReport(decNode.getYarnRMNode().getNodeID()));
        Assert.assertEquals((long)dummyNode.getYarnRMNode().getTotalCapability().getMemory(), (long)clusterMemory);
        Assert.assertEquals((long)dummyNode.getYarnRMNode().getTotalCapability().getVirtualCores(), (long)clusterVCores);
        Assert.assertEquals((long)dummyNode.getYarnRMNode().getTotalCapability().getGPUs(), (long)clusterGPUs);
        Assert.assertEquals((long)1L, (long)numOfNodes);
    }

    @Test
    @Ignore
    public void testUpdateNode() throws Exception {
        FullRMNode addedNode = this.generateHopRMNode(this.getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        RMNodeWriter rmNodeWriter = new RMNodeWriter(addedNode);
        rmNodeWriter.handle();
        TimeUnit.SECONDS.sleep(1L);
        FullRMNode dummyNode = this.generateHopRMNode(this.getId(), PendingEvent.Type.NODE_ADDED, PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING, NodeState.NEW);
        rmNodeWriter = new RMNodeWriter(dummyNode);
        rmNodeWriter.handle();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertTrue((String)("Node " + addedNode.getYarnRMNode().getNodeID()), (boolean)rm.getRMContext().getRMNodes().containsKey(addedNode.getYarnRMNode().getNodeID()));
    }

    private FullRMNode generateHopRMNode(int id, PendingEvent.Type type, PendingEvent.Status status, NodeState state) {
        int contains = 0;
        RMNodeImplDist rmNode = new RMNodeImplDist(NodeId.newInstance((String)("host" + id), (int)1234), rm.getRMContext(), "host" + id, 1337, 8080, (Node)new NodeBase("name", "/location"), org.apache.hadoop.yarn.api.records.Resource.newInstance((int)6144, (int)6, (int)6), "1.0");
        if (!NodeState.NEW.equals((Object)state)) {
            rmNode.setState(state.name());
        }
        RMNode hopRMNode = new RMNode(rmNode.getNodeID().toString(), rmNode.getHostName(), rmNode.getCommandPort(), rmNode.getHttpPort(), rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), rmNode.getState().name(), rmNode.getNodeManagerVersion(), id);
        ++contains;
        Resource hopResource = new Resource(rmNode.getNodeID().toString(), rmNode.getTotalCapability().getMemory(), rmNode.getTotalCapability().getVirtualCores(), rmNode.getTotalCapability().getGPUs(), id);
        NextHeartbeat nextHB = new NextHeartbeat(rmNode.getNodeID().toString(), true);
        PendingEvent pendingEvent = new PendingEvent(rmNode.getNodeID().toString(), type, status, id, ++contains);
        ++contains;
        List<ContainerStatus> containerStatuses = this.generateContainerStatuses(1, rmNode.getNodeID().toString(), pendingEvent.getId().getEventId());
        pendingEvent.setContains(++contains);
        return new FullRMNode((org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode)rmNode, hopRMNode, pendingEvent, hopResource, nextHB, id, containerStatuses);
    }

    private List<ContainerStatus> generateContainerStatuses(int numOfContainers, String rmNodeId, int pendingEventId) {
        ArrayList<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
        for (int i = 0; i < numOfContainers; ++i) {
            ContainerStatus contStat = new ContainerStatus(rmNodeId + "_" + i, "RUNNING", "HEALTHY", 1, rmNodeId, pendingEventId, pendingEventId);
            containerStatuses.add(contStat);
        }
        return containerStatuses;
    }

    private class RMNodeWriter
    extends LightWeightRequestHandler {
        private final FullRMNode toCommit;

        public RMNodeWriter(FullRMNode toCommit) {
            super((RequestHandler.OperationType)YARNOperationType.TEST);
            this.toCommit = toCommit;
        }

        public Object performTask() throws IOException {
            connector.beginTransaction();
            connector.writeLock();
            RMNodeDataAccess rmNodeDAO = (RMNodeDataAccess)RMStorageFactory.getDataAccess(RMNodeDataAccess.class);
            rmNodeDAO.add((Object)this.toCommit.getRmNode());
            connector.flush();
            PendingEventDataAccess pendingEventDAO = (PendingEventDataAccess)RMStorageFactory.getDataAccess(PendingEventDataAccess.class);
            pendingEventDAO.add((Object)this.toCommit.getPendingEvent());
            ResourceDataAccess resourceDAO = (ResourceDataAccess)RMStorageFactory.getDataAccess(ResourceDataAccess.class);
            resourceDAO.add((Object)this.toCommit.getResource());
            NextHeartbeatDataAccess nextHBDAO = (NextHeartbeatDataAccess)RMStorageFactory.getDataAccess(NextHeartbeatDataAccess.class);
            nextHBDAO.update(this.toCommit.getNextHeartbeat());
            ContainerStatusDataAccess contStatDAO = (ContainerStatusDataAccess)RMStorageFactory.getDataAccess(ContainerStatusDataAccess.class);
            contStatDAO.addAll(this.toCommit.getContainerStatuses());
            connector.commit();
            return null;
        }
    }

    private class FullRMNode {
        private final int id;
        private final org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode yarnRMNode;
        private final RMNode rmNode;
        private final PendingEvent pendingEvent;
        private final Resource resource;
        private final NextHeartbeat nextHeartbeat;
        private final List<ContainerStatus> containerStatuses;

        public FullRMNode(org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode yarnRMNode, RMNode rmNode, PendingEvent pendingEvent, Resource resource, NextHeartbeat nextHeartbeat, int id, List<ContainerStatus> containerStatuses) {
            this.yarnRMNode = yarnRMNode;
            this.rmNode = rmNode;
            this.pendingEvent = pendingEvent;
            this.resource = resource;
            this.nextHeartbeat = nextHeartbeat;
            this.id = id;
            this.containerStatuses = containerStatuses;
        }

        public List<ContainerStatus> getContainerStatuses() {
            return this.containerStatuses;
        }

        public int getId() {
            return this.id;
        }

        public org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode getYarnRMNode() {
            return this.yarnRMNode;
        }

        public RMNode getRmNode() {
            return this.rmNode;
        }

        public PendingEvent getPendingEvent() {
            return this.pendingEvent;
        }

        public Resource getResource() {
            return this.resource;
        }

        public NextHeartbeat getNextHeartbeat() {
            return this.nextHeartbeat;
        }
    }
}

