package io.hops.util;

import com.google.protobuf.InvalidProtocolBufferException;
import io.hops.metadata.yarn.entity.ContainerStatus;
import io.hops.metadata.yarn.entity.PendingEvent;
import io.hops.metadata.yarn.entity.PendingEventID;
import io.hops.metadata.yarn.entity.RMNode;
import io.hops.metadata.yarn.entity.RMNodeComps;
import io.hops.metadata.yarn.entity.Resource;
import io.hops.metadata.yarn.entity.UpdatedContainerInfo;
import io.hops.streaming.ContainerStatusEvent;
import io.hops.streaming.DBEvent;
import io.hops.streaming.PendingEventEvent;
import io.hops.streaming.RMNodeEvent;
import io.hops.streaming.ResourceEvent;
import io.hops.streaming.UpdatedContainerInfoEvent;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;

/* loaded from: input_file:io/hops/util/RmStreamingProcessor.class */
public class RmStreamingProcessor extends StreamingReceiver {
    private final ExecutorService exec;
    Map<PendingEventID, RMNodeComps> partialRMNodeComps;

    /* loaded from: input_file:io/hops/util/RmStreamingProcessor$RetrievingThread.class */
    private class RetrievingThread implements Runnable {
        private RetrievingThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DBEvent take;
            RMNodeComps rMNodeComps;
            while (RmStreamingProcessor.this.running) {
                try {
                    take = DBEvent.receivedEvents.take();
                } catch (InterruptedException e) {
                    RmStreamingProcessor.this.LOG.error(e, e);
                }
                if (take instanceof PendingEventEvent) {
                    PendingEvent pendingEvent = ((PendingEventEvent) take).getPendingEvent();
                    rMNodeComps = RmStreamingProcessor.this.getRMNodeComps(pendingEvent.getId());
                    rMNodeComps.setPendingEvent(pendingEvent);
                } else if (take instanceof RMNodeEvent) {
                    RMNode rmNode = ((RMNodeEvent) take).getRmNode();
                    rMNodeComps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(rmNode.getPendingEventId()), rmNode.getNodeId()));
                    rMNodeComps.setRMNode(rmNode);
                } else if (take instanceof ResourceEvent) {
                    Resource resource = ((ResourceEvent) take).getResource();
                    rMNodeComps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(resource.getPendingEventId()), resource.getId()));
                    rMNodeComps.setResource(resource);
                } else if (take instanceof UpdatedContainerInfoEvent) {
                    UpdatedContainerInfo updatedContainerInfo = ((UpdatedContainerInfoEvent) take).getUpdatedContainerInfo();
                    rMNodeComps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(updatedContainerInfo.getPendingEventId()), updatedContainerInfo.getRmnodeid()));
                    rMNodeComps.addUpdatedContainerInfo(updatedContainerInfo);
                } else if (take instanceof ContainerStatusEvent) {
                    ContainerStatus containerStatus = ((ContainerStatusEvent) take).getContainerStatus();
                    rMNodeComps = RmStreamingProcessor.this.getRMNodeComps(new PendingEventID(Integer.valueOf(containerStatus.getPendingEventId()), containerStatus.getRMNodeId()));
                    rMNodeComps.addContainersStatus(containerStatus);
                } else {
                    RmStreamingProcessor.this.LOG.error("should not receive events of type " + take.getClass().getCanonicalName());
                }
                if (rMNodeComps.isComplet()) {
                    RmStreamingProcessor.this.partialRMNodeComps.remove(rMNodeComps.getPendingEvent().getId());
                    if (rMNodeComps != null && RmStreamingProcessor.this.rmContext.isDistributed()) {
                        try {
                            org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode processHopRMNodeCompsForScheduler = DBUtility.processHopRMNodeCompsForScheduler(rMNodeComps, RmStreamingProcessor.this.rmContext);
                            RmStreamingProcessor.this.LOG.debug("HOP :: RetrievingThread RMNode: " + processHopRMNodeCompsForScheduler);
                            if (processHopRMNodeCompsForScheduler != null) {
                                RmStreamingProcessor.this.updateRMContext(processHopRMNodeCompsForScheduler);
                                RmStreamingProcessor.this.triggerEvent(processHopRMNodeCompsForScheduler, rMNodeComps.getPendingEvent());
                            }
                            DBUtility.removePendingEvent(rMNodeComps.getPendingEvent().getId().getNodeId(), rMNodeComps.getPendingEvent().getType(), rMNodeComps.getPendingEvent().getStatus(), rMNodeComps.getPendingEvent().getId().getEventId().intValue(), rMNodeComps.getPendingEvent().getContains());
                        } catch (IOException e2) {
                            RmStreamingProcessor.this.LOG.error("HOP :: Error removing from DB: " + e2, e2);
                        } catch (InvalidProtocolBufferException e3) {
                            RmStreamingProcessor.this.LOG.error("HOP :: Error retrieving RMNode: " + e3, e3);
                        }
                    }
                }
            }
            RmStreamingProcessor.this.exec.shutdown();
            RmStreamingProcessor.this.LOG.info("HOP :: RM Event retriever interrupted");
        }
    }

    public RmStreamingProcessor(RMContext rMContext) {
        super(rMContext, "RM Event retriever");
        this.partialRMNodeComps = new HashMap();
        setRetrievingRunnable(new RetrievingThread());
        this.exec = Executors.newCachedThreadPool();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateRMContext(org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode rMNode) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("HOP :: PendingEventRetrieval rmNode " + rMNode + ", state: " + rMNode.getState());
        }
        if (rMNode.getState() == NodeState.DECOMMISSIONED || rMNode.getState() == NodeState.REBOOTED || rMNode.getState() == NodeState.LOST) {
            this.rmContext.getInactiveRMNodes().put(rMNode.getNodeID(), rMNode);
            this.rmContext.getRMNodes().remove(rMNode.getNodeID(), rMNode);
        } else {
            this.rmContext.getInactiveRMNodes().remove(rMNode.getNodeID().getHost(), rMNode);
            this.rmContext.getRMNodes().put(rMNode.getNodeID(), rMNode);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void triggerEvent(final org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode rMNode, PendingEvent pendingEvent) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("NodeUpdate event_pending event trigger event: " + pendingEvent.getId().getEventId() + " : " + pendingEvent.getId().getNodeId());
        }
        this.exec.submit(new Runnable() { // from class: io.hops.util.RmStreamingProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                NetUtils.normalizeHostName(rMNode.getHostName());
            }
        });
        if (pendingEvent.getType().equals(PendingEvent.Type.NODE_ADDED)) {
            this.LOG.debug("HOP :: PendingEventRetrieval event NodeAdded: " + pendingEvent);
            this.rmContext.getDispatcher().getEventHandler().handle(new NodeAddedSchedulerEvent(rMNode));
            return;
        }
        if (pendingEvent.getType().equals(PendingEvent.Type.NODE_REMOVED)) {
            this.LOG.debug("HOP :: PendingEventRetrieval event NodeRemoved: " + pendingEvent);
            this.rmContext.getDispatcher().getEventHandler().handle(new NodeRemovedSchedulerEvent(rMNode));
        } else if (pendingEvent.getType().equals(PendingEvent.Type.NODE_UPDATED)) {
            if (pendingEvent.getStatus().equals(PendingEvent.Status.SCHEDULER_FINISHED_PROCESSING)) {
                this.LOG.debug("HOP :: NodeUpdate event - event_scheduler - finished_processing RMNode: " + rMNode.getNodeID() + " pending event: " + pendingEvent.getId().getEventId());
                this.rmContext.getDispatcher().getEventHandler().handle(new NodeUpdateSchedulerEvent(rMNode));
            } else if (pendingEvent.getStatus().equals(PendingEvent.Status.SCHEDULER_NOT_FINISHED_PROCESSING)) {
                this.LOG.debug("NodeUpdate event - event_scheduler - NOT_finished_processing RMNode: " + rMNode.getNodeID() + " pending event: " + pendingEvent.getId().getEventId());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RMNodeComps getRMNodeComps(PendingEventID pendingEventID) {
        RMNodeComps rMNodeComps = this.partialRMNodeComps.get(pendingEventID);
        if (rMNodeComps == null) {
            rMNodeComps = new RMNodeComps();
            this.partialRMNodeComps.put(pendingEventID, rMNodeComps);
        }
        return rMNodeComps;
    }
}
