package org.apache.tez.common;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/tez/common/AsyncDispatcherConcurrent.class */
public class AsyncDispatcherConcurrent extends CompositeService implements Dispatcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncDispatcher.class);
    private final String name;
    private final ArrayList<LinkedBlockingQueue<Event>> eventQueues;
    private volatile boolean stopped;
    private volatile boolean drainEventsOnStop;
    private volatile boolean drained;
    private Object waitForDrained;
    private volatile boolean blockNewEvents;
    private EventHandler handlerInstance;
    private ExecutorService execService;
    private final int numThreads;
    protected final Map<Class<? extends Enum>, EventHandler> eventHandlers;
    protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers;
    private boolean exitOnDispatchException;

    /* loaded from: input_file:org/apache/tez/common/AsyncDispatcherConcurrent$DispatchRunner.class */
    class DispatchRunner implements Runnable {
        final LinkedBlockingQueue<Event> queue;

        public DispatchRunner(LinkedBlockingQueue<Event> linkedBlockingQueue) {
            this.queue = linkedBlockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!AsyncDispatcherConcurrent.this.stopped && !Thread.currentThread().isInterrupted()) {
                AsyncDispatcherConcurrent.this.drained = this.queue.isEmpty();
                if (AsyncDispatcherConcurrent.this.blockNewEvents) {
                    synchronized (AsyncDispatcherConcurrent.this.waitForDrained) {
                        if (AsyncDispatcherConcurrent.this.drained) {
                            AsyncDispatcherConcurrent.this.waitForDrained.notify();
                        }
                    }
                }
                try {
                    Event take = this.queue.take();
                    if (take != null) {
                        AsyncDispatcherConcurrent.this.dispatch(take);
                    }
                } catch (InterruptedException e) {
                    if (AsyncDispatcherConcurrent.this.stopped) {
                        return;
                    }
                    AsyncDispatcherConcurrent.LOG.warn("AsyncDispatcher thread interrupted", (Throwable) e);
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/common/AsyncDispatcherConcurrent$GenericEventHandler.class */
    class GenericEventHandler implements EventHandler<TezAbstractEvent> {
        GenericEventHandler() {
        }

        /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Enum] */
        @Override // org.apache.hadoop.yarn.event.EventHandler
        public void handle(TezAbstractEvent tezAbstractEvent) {
            if (AsyncDispatcherConcurrent.this.stopped || AsyncDispatcherConcurrent.this.blockNewEvents) {
                return;
            }
            AsyncDispatcherConcurrent.this.drained = false;
            AsyncDispatcherConcurrent asyncDispatcherConcurrent = AsyncDispatcherConcurrent.this.eventDispatchers.get(tezAbstractEvent.getType().getDeclaringClass());
            if (asyncDispatcherConcurrent != null) {
                asyncDispatcherConcurrent.getEventHandler().handle(tezAbstractEvent);
                return;
            }
            LinkedBlockingQueue linkedBlockingQueue = (LinkedBlockingQueue) AsyncDispatcherConcurrent.this.eventQueues.get(AsyncDispatcherConcurrent.this.numThreads > 1 ? tezAbstractEvent.getSerializingHash() % AsyncDispatcherConcurrent.this.numThreads : 0);
            int size = linkedBlockingQueue.size();
            if (size != 0 && size % 1000 == 0) {
                AsyncDispatcherConcurrent.LOG.info("Size of event-queue is " + size);
            }
            int remainingCapacity = linkedBlockingQueue.remainingCapacity();
            if (remainingCapacity < 1000) {
                AsyncDispatcherConcurrent.LOG.warn("Very low remaining capacity in the event-queue: " + remainingCapacity);
            }
            try {
                linkedBlockingQueue.put(tezAbstractEvent);
            } catch (InterruptedException e) {
                if (!AsyncDispatcherConcurrent.this.stopped) {
                    AsyncDispatcherConcurrent.LOG.warn("AsyncDispatcher thread interrupted", (Throwable) e);
                }
                throw new YarnRuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/common/AsyncDispatcherConcurrent$MultiListenerHandler.class */
    public static class MultiListenerHandler implements EventHandler<Event> {
        List<EventHandler<Event>> listofHandlers = new ArrayList();

        @Override // org.apache.hadoop.yarn.event.EventHandler
        public void handle(Event event) {
            Iterator<EventHandler<Event>> it = this.listofHandlers.iterator();
            while (it.hasNext()) {
                it.next().handle(event);
            }
        }

        void addHandler(EventHandler<Event> eventHandler) {
            this.listofHandlers.add(eventHandler);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncDispatcherConcurrent(String str, int i) {
        super(str);
        this.stopped = false;
        this.drainEventsOnStop = false;
        this.drained = true;
        this.waitForDrained = new Object();
        this.blockNewEvents = false;
        this.handlerInstance = new GenericEventHandler();
        this.eventHandlers = Maps.newHashMap();
        this.eventDispatchers = Maps.newHashMap();
        this.exitOnDispatchException = false;
        Preconditions.checkArgument(i > 0);
        this.name = str;
        this.eventQueues = Lists.newArrayListWithCapacity(i);
        this.numThreads = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.CompositeService, org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        super.serviceInit(configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.CompositeService, org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        this.execService = Executors.newFixedThreadPool(this.numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Dispatcher {" + this.name + "} #%d").build());
        for (int i = 0; i < this.numThreads; i++) {
            this.eventQueues.add(new LinkedBlockingQueue<>());
        }
        for (int i2 = 0; i2 < this.numThreads; i2++) {
            this.execService.execute(new DispatchRunner(this.eventQueues.get(i2)));
        }
        super.serviceStart();
    }

    public void setDrainEventsOnStop() {
        this.drainEventsOnStop = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.CompositeService, org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        if (this.execService != null) {
            if (this.drainEventsOnStop) {
                this.blockNewEvents = true;
                LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
                synchronized (this.waitForDrained) {
                    while (!this.drained && !this.execService.isShutdown()) {
                        LOG.info("Waiting for AsyncDispatcher to drain.");
                        this.waitForDrained.wait(1000L);
                    }
                }
            }
            this.stopped = true;
            for (int i = 0; i < this.numThreads; i++) {
                LOG.info("AsyncDispatcher stopping with events: " + this.eventQueues.get(i).size() + " in queue: " + i);
            }
            this.execService.shutdownNow();
        }
        super.serviceStop();
    }

    protected void dispatch(Event event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString());
        }
        Class declaringClass = event.getType().getDeclaringClass();
        try {
            EventHandler eventHandler = this.eventHandlers.get(declaringClass);
            if (eventHandler == null) {
                throw new Exception("No handler for registered for " + declaringClass);
            }
            eventHandler.handle(event);
        } catch (Throwable th) {
            LOG.error("Error in dispatcher thread", th);
            if (!this.exitOnDispatchException || ShutdownHookManager.get().isShutdownInProgress() || this.stopped) {
                return;
            }
            Thread thread = new Thread(createShutDownThread());
            thread.setName("AsyncDispatcher ShutDown handler");
            thread.start();
        }
    }

    private void checkForExistingHandler(Class<? extends Enum> cls) {
        Preconditions.checkState(this.eventHandlers.get(cls) == null, "Cannot register same event on multiple dispatchers");
    }

    private void checkForExistingDispatcher(Class<? extends Enum> cls) {
        Preconditions.checkState(this.eventDispatchers.get(cls) == null, "Multiple dispatchers cannot be registered for: " + cls.getName());
    }

    private void checkForExistingDispatchers(boolean z, Class<? extends Enum> cls) {
        if (z) {
            checkForExistingHandler(cls);
        }
        checkForExistingDispatcher(cls);
    }

    @Override // org.apache.hadoop.yarn.event.Dispatcher
    public void register(Class<? extends Enum> cls, EventHandler eventHandler) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        EventHandler<Event> eventHandler2 = this.eventHandlers.get(cls);
        checkForExistingDispatchers(false, cls);
        LOG.info("Registering " + cls + " for " + eventHandler.getClass());
        if (eventHandler2 == null) {
            this.eventHandlers.put(cls, eventHandler);
            return;
        }
        if (eventHandler2 instanceof MultiListenerHandler) {
            ((MultiListenerHandler) eventHandler2).addHandler(eventHandler);
            return;
        }
        MultiListenerHandler multiListenerHandler = new MultiListenerHandler();
        multiListenerHandler.addHandler(eventHandler2);
        multiListenerHandler.addHandler(eventHandler);
        this.eventHandlers.put(cls, multiListenerHandler);
    }

    public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> cls, EventHandler eventHandler, String str, int i) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        checkForExistingDispatchers(true, cls);
        LOG.info("Registering " + cls + " for independent dispatch using: " + eventHandler.getClass());
        AsyncDispatcherConcurrent asyncDispatcherConcurrent = new AsyncDispatcherConcurrent(str, i);
        asyncDispatcherConcurrent.register(cls, eventHandler);
        this.eventDispatchers.put(cls, asyncDispatcherConcurrent);
        addIfService(asyncDispatcherConcurrent);
        return asyncDispatcherConcurrent;
    }

    public void registerWithExistingDispatcher(Class<? extends Enum> cls, EventHandler eventHandler, AsyncDispatcherConcurrent asyncDispatcherConcurrent) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        checkForExistingDispatchers(true, cls);
        LOG.info("Registering " + cls + " wit existing concurrent dispatch using: " + eventHandler.getClass());
        asyncDispatcherConcurrent.register(cls, eventHandler);
        this.eventDispatchers.put(cls, asyncDispatcherConcurrent);
    }

    @VisibleForTesting
    public void enableExitOnDispatchException() {
        this.exitOnDispatchException = true;
    }

    @Override // org.apache.hadoop.yarn.event.Dispatcher
    public EventHandler getEventHandler() {
        return this.handlerInstance;
    }

    Runnable createShutDownThread() {
        return new Runnable() { // from class: org.apache.tez.common.AsyncDispatcherConcurrent.1
            @Override // java.lang.Runnable
            public void run() {
                AsyncDispatcherConcurrent.LOG.info("Exiting, bbye..");
                System.exit(-1);
            }
        };
    }
}
