/*
 * Decompiled with CFR 0.152.
 */
package io.hops.util;

import com.google.protobuf.InvalidProtocolBufferException;
import io.hops.metadata.yarn.entity.ContainerStatus;
import io.hops.metadata.yarn.entity.PendingEvent;
import io.hops.metadata.yarn.entity.PendingEventID;
import io.hops.metadata.yarn.entity.RMNodeComps;
import io.hops.metadata.yarn.entity.Resource;
import io.hops.metadata.yarn.entity.UpdatedContainerInfo;
import io.hops.streaming.ContainerStatusEvent;
import io.hops.streaming.DBEvent;
import io.hops.streaming.PendingEventEvent;
import io.hops.streaming.RMNodeEvent;
import io.hops.streaming.ResourceEvent;
import io.hops.streaming.UpdatedContainerInfoEvent;
import io.hops.util.DBUtility;
import io.hops.util.StreamingReceiver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;

public class RmStreamingProcessor
extends StreamingReceiver {
    private final ExecutorService exec;
    Map<PendingEventID, RMNodeComps> partialRMNodeComps = new HashMap<PendingEventID, RMNodeComps>();

    public RmStreamingProcessor(RMContext rmContext) {
        super(rmContext, "RM Event retriever");
        this.setRetrievingRunnable(new RetrievingThread());
        this.exec = Executors.newCachedThreadPool();
    }

    private void updateRMContext(RMNode rmNode) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug((Object)("HOP :: PendingEventRetrieval rmNode " + rmNode + ", state: " + rmNode.getState()));
        }
        if (rmNode.getState() == NodeState.DECOMMISSIONED || rmNode.getState() == NodeState.REBOOTED || rmNode.getState() == NodeState.LOST) {
            this.rmContext.getInactiveRMNodes().put(rmNode.getNodeID(), rmNode);
            this.rmContext.getRMNodes().remove(rmNode.getNodeID(), rmNode);
        } else {
            this.rmContext.getInactiveRMNodes().remove(rmNode.getNodeID().getHost(), rmNode);
            this.rmContext.getRMNodes().put(rmNode.getNodeID(), rmNode);
        }
    }

    private void triggerEvent(final RMNode rmNode, PendingEvent pendingEvent) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug((Object)("NodeUpdate event_pending event trigger event: " + pendingEvent.getId().getEventId() + " : " + pendingEvent.getId().getNodeId()));
        }
        this.exec.submit(new Runnable(){

            @Override
            public void run() {
                NetUtils.normalizeHostName((String)rmNode.getHostName());
            }
        });
        if (pendingEvent.getType().equals((Object)PendingEvent.Type.NODE_ADDED)) {
            this.LOG.debug((Object)("HOP :: PendingEventRetrieval event NodeAdded: " + pendingEvent));
            this.rmContext.getDispatcher().getEventHandler().handle((Event)new NodeAddedSchedulerEvent(rmNode));
        } else if (pendingEvent.getType().equals((Object)PendingEvent.Type.NODE_REMOVED)) {
            this.LOG.debug((Object)("HOP :: PendingEventRetrieval event NodeRemoved: " + pendingEvent));
            this.rmContext.getDispatcher().getEventHandler().handle((Event)new NodeRemovedSchedulerEvent(rmNode));
        } else if (pendingEvent.getType().equals((Object)PendingEvent.Type.NODE_UPDATED)) {
            if (pendingEvent.getStatus().equals((Object)PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING)) {
                this.LOG.debug((Object)("HOP :: NodeUpdate event - event_scheduler - finished_processing RMNode: " + rmNode.getNodeID() + " pending event: " + pendingEvent.getId().getEventId()));
                this.rmContext.getDispatcher().getEventHandler().handle((Event)new NodeUpdateSchedulerEvent(rmNode));
            } else if (pendingEvent.getStatus().equals((Object)PendingEvent.Status.SCHEDULER_NOT_FINISHED_PROCESSING)) {
                this.LOG.debug((Object)("NodeUpdate event - event_scheduler - NOT_finished_processing RMNode: " + rmNode.getNodeID() + " pending event: " + pendingEvent.getId().getEventId()));
            }
        }
    }

    private RMNodeComps getRMNodeComps(PendingEventID id) {
        RMNodeComps comps = this.partialRMNodeComps.get(id);
        if (comps == null) {
            comps = new RMNodeComps();
            this.partialRMNodeComps.put(id, comps);
        }
        return comps;
    }

    private class RetrievingThread
    implements Runnable {
        private RetrievingThread() {
        }

        @Override
        public void run() {
            while (RmStreamingProcessor.this.running) {
                try {
                    Object rmNode;
                    RMNodeComps comps;
                    DBEvent event = DBEvent.receivedEvents.take();
                    if (event instanceof PendingEventEvent) {
                        PendingEvent pendingEvent = ((PendingEventEvent)event).getPendingEvent();
                        comps = RmStreamingProcessor.this.getRMNodeComps(pendingEvent.getId());
                        comps.setPendingEvent(pendingEvent);
                    } else if (event instanceof RMNodeEvent) {
                        rmNode = ((RMNodeEvent)event).getRmNode();
                        comps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(rmNode.getPendingEventId()), rmNode.getNodeId()));
                        comps.setRMNode(rmNode);
                    } else if (event instanceof ResourceEvent) {
                        Resource resource = ((ResourceEvent)event).getResource();
                        comps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(resource.getPendingEventId()), resource.getId()));
                        comps.setResource(resource);
                    } else if (event instanceof UpdatedContainerInfoEvent) {
                        UpdatedContainerInfo uci = ((UpdatedContainerInfoEvent)event).getUpdatedContainerInfo();
                        comps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(uci.getPendingEventId()), uci.getRmnodeid()));
                        comps.addUpdatedContainerInfo(uci);
                    } else if (event instanceof ContainerStatusEvent) {
                        ContainerStatus containerStatus = ((ContainerStatusEvent)event).getContainerStatus();
                        comps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(containerStatus.getPendingEventId()), containerStatus.getRMNodeId()));
                        comps.addContainersStatus(containerStatus);
                    } else {
                        RmStreamingProcessor.this.LOG.error((Object)("should not receive events of type " + event.getClass().getCanonicalName()));
                        continue;
                    }
                    if (!comps.isComplet()) continue;
                    RmStreamingProcessor.this.partialRMNodeComps.remove(comps.getPendingEvent().getId());
                    if (comps == null || !RmStreamingProcessor.this.rmContext.isDistributed()) continue;
                    rmNode = null;
                    try {
                        rmNode = DBUtility.processHopRMNodeCompsForScheduler(comps, RmStreamingProcessor.this.rmContext);
                        RmStreamingProcessor.this.LOG.debug((Object)("HOP :: RetrievingThread RMNode: " + rmNode));
                        if (rmNode != null) {
                            RmStreamingProcessor.this.updateRMContext((RMNode)rmNode);
                            RmStreamingProcessor.this.triggerEvent((RMNode)rmNode, comps.getPendingEvent());
                        }
                        DBUtility.removePendingEvent(comps.getPendingEvent().getId().getNodeId(), comps.getPendingEvent().getType(), comps.getPendingEvent().getStatus(), comps.getPendingEvent().getId().getEventId(), comps.getPendingEvent().getContains());
                    }
                    catch (InvalidProtocolBufferException ex) {
                        RmStreamingProcessor.this.LOG.error((Object)("HOP :: Error retrieving RMNode: " + (Object)((Object)ex)), (Throwable)ex);
                    }
                    catch (IOException ex) {
                        RmStreamingProcessor.this.LOG.error((Object)("HOP :: Error removing from DB: " + ex), (Throwable)ex);
                    }
                }
                catch (InterruptedException ex) {
                    RmStreamingProcessor.this.LOG.error((Object)ex, (Throwable)ex);
                }
            }
            RmStreamingProcessor.this.exec.shutdown();
            RmStreamingProcessor.this.LOG.info((Object)"HOP :: RM Event retriever interrupted");
        }
    }
}

