/*
 * Decompiled with CFR 0.152.
 */
package io.hops;

import io.hops.exception.StorageException;
import io.hops.exception.StorageInitializtionException;
import io.hops.metadata.yarn.dal.ContainerIdToCleanDataAccess;
import io.hops.metadata.yarn.dal.ContainerStatusDataAccess;
import io.hops.metadata.yarn.dal.NextHeartbeatDataAccess;
import io.hops.metadata.yarn.dal.PendingEventDataAccess;
import io.hops.metadata.yarn.dal.RMNodeApplicationsDataAccess;
import io.hops.metadata.yarn.dal.RMNodeDataAccess;
import io.hops.metadata.yarn.dal.ResourceDataAccess;
import io.hops.metadata.yarn.dal.UpdatedContainerInfoDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.ContainerId;
import io.hops.metadata.yarn.entity.ContainerStatus;
import io.hops.metadata.yarn.entity.NextHeartbeat;
import io.hops.metadata.yarn.entity.PendingEvent;
import io.hops.metadata.yarn.entity.RMNode;
import io.hops.metadata.yarn.entity.RMNodeApplication;
import io.hops.metadata.yarn.entity.Resource;
import io.hops.metadata.yarn.entity.UpdatedContainerInfo;
import io.hops.streaming.ContainerIdToCleanEvent;
import io.hops.streaming.ContainerStatusEvent;
import io.hops.streaming.DBEvent;
import io.hops.streaming.NextHeartBeatEvent;
import io.hops.streaming.PendingEventEvent;
import io.hops.streaming.RMNodeApplicationsEvent;
import io.hops.streaming.RMNodeEvent;
import io.hops.streaming.ResourceEvent;
import io.hops.streaming.UpdatedContainerInfoEvent;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.DBUtility;
import io.hops.util.RMStorageFactory;
import io.hops.util.YarnAPIStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestStreaming {
    private static Configuration conf;

    @BeforeClass
    public static void setUp() throws Exception {
        conf = new YarnConfiguration();
        conf.setBoolean("yarn.client.failover-distributed", true);
        RMStorageFactory.setConfiguration((Configuration)conf);
        YarnAPIStorageFactory.setConfiguration((Configuration)conf);
        DBUtility.InitializeDB();
    }

    @Test
    public void test() throws StorageInitializtionException, IOException, InterruptedException {
        RMStorageFactory.kickEventStreamingAPI((boolean)true, (Configuration)conf);
        LightWeightRequestHandler handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                RMNodeDataAccess rmnDA = (RMNodeDataAccess)RMStorageFactory.getDataAccess(RMNodeDataAccess.class);
                rmnDA.add((Object)new RMNode("nodeid", "node name", 42, 43, "tout vat bien", 1L, "tiptop", "version", 2));
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        DBEvent event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof RMNodeEvent));
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                PendingEventDataAccess DA = (PendingEventDataAccess)RMStorageFactory.getDataAccess(PendingEventDataAccess.class);
                DA.add((Object)new PendingEvent("nodeId", PendingEvent.Type.NODE_ADDED, PendingEvent.Status.NEW, 0, 1));
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof PendingEventEvent));
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                ResourceDataAccess DA = (ResourceDataAccess)RMStorageFactory.getDataAccess(ResourceDataAccess.class);
                DA.add((Object)new Resource("resource", 1, 2, 2, 3));
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof ResourceEvent));
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                UpdatedContainerInfoDataAccess DA = (UpdatedContainerInfoDataAccess)RMStorageFactory.getDataAccess(UpdatedContainerInfoDataAccess.class);
                ArrayList<UpdatedContainerInfo> toAdd = new ArrayList<UpdatedContainerInfo>();
                toAdd.add(new UpdatedContainerInfo("rmnodeid", "containerid", 1, 2));
                DA.addAll(toAdd);
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof UpdatedContainerInfoEvent));
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                ContainerStatusDataAccess DA = (ContainerStatusDataAccess)RMStorageFactory.getDataAccess(ContainerStatusDataAccess.class);
                ArrayList<ContainerStatus> toAdd = new ArrayList<ContainerStatus>();
                toAdd.add(new ContainerStatus("containerid", "state", "diagnostics", 0, "rmnodeid", 1, 2));
                DA.addAll(toAdd);
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof ContainerStatusEvent));
        RMStorageFactory.stopEventStreamingAPI();
        RMStorageFactory.kickEventStreamingAPI((boolean)false, (Configuration)conf);
        Thread.sleep(1000L);
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                ContainerIdToCleanDataAccess DA = (ContainerIdToCleanDataAccess)RMStorageFactory.getDataAccess(ContainerIdToCleanDataAccess.class);
                DA.add((Object)new ContainerId("rmnodeId", "containerId"));
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof ContainerIdToCleanEvent));
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                NextHeartbeatDataAccess DA = (NextHeartbeatDataAccess)RMStorageFactory.getDataAccess(NextHeartbeatDataAccess.class);
                DA.update(new NextHeartbeat("nodeId", true));
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof NextHeartBeatEvent));
        handler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.writeLock();
                RMNodeApplicationsDataAccess DA = (RMNodeApplicationsDataAccess)RMStorageFactory.getDataAccess(RMNodeApplicationsDataAccess.class);
                DA.add((Object)new RMNodeApplication("rmnodeId", "applicationId", RMNodeApplication.RMNodeApplicationStatus.FINISHED));
                connector.commit();
                return null;
            }
        };
        handler.handle();
        Thread.sleep(1000L);
        Assert.assertEquals((long)DBEvent.receivedEvents.size(), (long)1L);
        event = (DBEvent)DBEvent.receivedEvents.take();
        Assert.assertTrue((boolean)(event instanceof RMNodeApplicationsEvent));
    }
}

