/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.metrics.EventTypeMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AsyncDispatcher
extends AbstractService
implements Dispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class);
    private static final Marker FATAL = MarkerFactory.getMarker((String)"FATAL");
    private final BlockingQueue<Event> eventQueue;
    private volatile int lastEventQueueSizeLogged = 0;
    private volatile int lastEventDetailsQueueSizeLogged = 0;
    private volatile boolean stopped = false;
    private int detailsInterval;
    private boolean printTrigger = false;
    private volatile boolean drainEventsOnStop = false;
    private volatile boolean drained = true;
    private final Object waitForDrained = new Object();
    private volatile boolean blockNewEvents = false;
    private final EventHandler<Event> handlerInstance = new GenericEventHandler();
    private Thread eventHandlingThread;
    protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
    private boolean exitOnDispatchException = true;
    private Map<Class<? extends Enum>, EventTypeMetrics> eventTypeMetricsMap;
    private Clock clock = new MonotonicClock();
    private ThreadPoolExecutor printEventDetailsExecutor;
    private String dispatcherThreadName = "AsyncDispatcher event handler";

    public AsyncDispatcher() {
        this(new LinkedBlockingQueue<Event>());
    }

    public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
        super("Dispatcher");
        this.eventQueue = eventQueue;
        this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
        this.eventTypeMetricsMap = new HashMap<Class<? extends Enum>, EventTypeMetrics>();
    }

    public AsyncDispatcher(String dispatcherName) {
        this();
        this.dispatcherThreadName = dispatcherName;
    }

    Runnable createThread() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!AsyncDispatcher.this.stopped && !Thread.currentThread().isInterrupted()) {
                    Event event;
                    AsyncDispatcher.this.drained = AsyncDispatcher.this.eventQueue.isEmpty();
                    if (AsyncDispatcher.this.blockNewEvents) {
                        Object object = AsyncDispatcher.this.waitForDrained;
                        synchronized (object) {
                            if (AsyncDispatcher.this.drained) {
                                AsyncDispatcher.this.waitForDrained.notify();
                            }
                        }
                    }
                    try {
                        event = AsyncDispatcher.this.eventQueue.take();
                    }
                    catch (InterruptedException ie) {
                        if (!AsyncDispatcher.this.stopped) {
                            LOG.warn("AsyncDispatcher thread interrupted", (Throwable)ie);
                        }
                        return;
                    }
                    if (event == null) continue;
                    if (AsyncDispatcher.this.eventTypeMetricsMap.get(((Enum)event.getType()).getDeclaringClass()) != null) {
                        long startTime = AsyncDispatcher.this.clock.getTime();
                        AsyncDispatcher.this.dispatch(event);
                        AsyncDispatcher.this.eventTypeMetricsMap.get(((Enum)event.getType()).getDeclaringClass()).increment(event.getType(), AsyncDispatcher.this.clock.getTime() - startTime);
                    } else {
                        AsyncDispatcher.this.dispatch(event);
                    }
                    if (!AsyncDispatcher.this.printTrigger) continue;
                    LOG.info("Latest dispatch event type: " + event.getType());
                    AsyncDispatcher.this.printTrigger = false;
                }
            }
        };
    }

    @VisibleForTesting
    public void disableExitOnDispatchException() {
        this.exitOnDispatchException = false;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.detailsInterval = this.getConfig().getInt("yarn.dispatcher.print-events-info.threshold", 5000);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PrintEventDetailsThread #%d").build();
        int numCorePoolSizeThreads = this.getConfig().getInt("yarn.dispatcher.print-thread-pool.core-pool-size", 1);
        int numMaximumPoolSizeThreads = this.getConfig().getInt("yarn.dispatcher.print-thread-pool.maximum-pool-size", 5);
        long keepAliveTime = conf.getTimeDuration("yarn.dispatcher.print-thread-pool.keep-alive-time", 10000L, TimeUnit.SECONDS);
        this.printEventDetailsExecutor = new ThreadPoolExecutor(numCorePoolSizeThreads, numMaximumPoolSizeThreads, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
        this.eventHandlingThread = new Thread(this.createThread());
        this.eventHandlingThread.setName(this.dispatcherThreadName);
        this.eventHandlingThread.start();
    }

    public void setDrainEventsOnStop() {
        this.drainEventsOnStop = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void serviceStop() throws Exception {
        if (this.drainEventsOnStop) {
            this.blockNewEvents = true;
            LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
            long endTime = System.currentTimeMillis() + this.getConfig().getLong("yarn.dispatcher.drain-events.timeout", 300000L);
            Object object = this.waitForDrained;
            synchronized (object) {
                while (!this.isDrained() && this.eventHandlingThread != null && this.eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) {
                    this.waitForDrained.wait(100L);
                    LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + this.eventHandlingThread.getState());
                }
            }
        }
        this.stopped = true;
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
            try {
                this.eventHandlingThread.join();
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted Exception while stopping", (Throwable)ie);
            }
        }
        this.printEventDetailsExecutor.shutdownNow();
        super.serviceStop();
    }

    protected void dispatch(Event event) {
        block3: {
            LOG.debug("Dispatching the event {}.{}", (Object)event.getClass().getName(), (Object)event);
            Class type = ((Enum)event.getType()).getDeclaringClass();
            try {
                EventHandler handler = this.eventDispatchers.get(type);
                if (handler == null) {
                    throw new Exception("No handler for registered for " + type);
                }
                handler.handle(event);
            }
            catch (Throwable t) {
                LOG.error(FATAL, "Error in dispatcher thread", t);
                if (!this.exitOnDispatchException || ShutdownHookManager.get().isShutdownInProgress() || this.stopped) break block3;
                this.stopped = true;
                Thread shutDownThread = new Thread(this.createShutDownThread());
                shutDownThread.setName("AsyncDispatcher ShutDown handler");
                shutDownThread.start();
            }
        }
    }

    @Override
    public void register(Class<? extends Enum> eventType, EventHandler handler) {
        EventHandler registeredHandler = this.eventDispatchers.get(eventType);
        LOG.info("Registering " + eventType + " for " + handler.getClass());
        if (registeredHandler == null) {
            this.eventDispatchers.put(eventType, handler);
        } else if (!(registeredHandler instanceof MultiListenerHandler)) {
            MultiListenerHandler multiHandler = new MultiListenerHandler();
            multiHandler.addHandler(registeredHandler);
            multiHandler.addHandler(handler);
            this.eventDispatchers.put(eventType, multiHandler);
        } else {
            MultiListenerHandler multiHandler = (MultiListenerHandler)registeredHandler;
            multiHandler.addHandler(handler);
        }
    }

    @Override
    public EventHandler<Event> getEventHandler() {
        return this.handlerInstance;
    }

    Runnable createShutDownThread() {
        return new Runnable(){

            @Override
            public void run() {
                LOG.info("Exiting, bbye..");
                System.exit(-1);
            }
        };
    }

    @VisibleForTesting
    protected boolean isEventThreadWaiting() {
        return this.eventHandlingThread.getState() == Thread.State.WAITING;
    }

    protected boolean isDrained() {
        return this.drained;
    }

    protected boolean isStopped() {
        return this.stopped;
    }

    public void addMetrics(EventTypeMetrics metrics, Class<? extends Enum> eventClass) {
        this.eventTypeMetricsMap.put(eventClass, metrics);
    }

    public int getEventQueueSize() {
        return this.eventQueue.size();
    }

    class GenericEventHandler
    implements EventHandler<Event> {
        GenericEventHandler() {
        }

        private void printEventQueueDetails() {
            Iterator iterator = AsyncDispatcher.this.eventQueue.iterator();
            HashMap counterMap = new HashMap();
            while (iterator.hasNext()) {
                Object eventType = ((Event)iterator.next()).getType();
                if (!counterMap.containsKey(eventType)) {
                    counterMap.put(eventType, 0L);
                }
                counterMap.put(eventType, (Long)counterMap.get(eventType) + 1L);
            }
            for (Map.Entry entry : counterMap.entrySet()) {
                long num = (Long)entry.getValue();
                LOG.info("Event type: " + entry.getKey() + ", Event record counter: " + num);
            }
        }

        @Override
        public void handle(Event event) {
            int remCapacity;
            if (AsyncDispatcher.this.blockNewEvents) {
                return;
            }
            AsyncDispatcher.this.drained = false;
            int qSize = AsyncDispatcher.this.eventQueue.size();
            if (qSize != 0 && qSize % 1000 == 0 && AsyncDispatcher.this.lastEventQueueSizeLogged != qSize) {
                AsyncDispatcher.this.lastEventQueueSizeLogged = qSize;
                LOG.info("Size of event-queue is " + qSize);
            }
            if (qSize != 0 && qSize % AsyncDispatcher.this.detailsInterval == 0 && AsyncDispatcher.this.lastEventDetailsQueueSizeLogged != qSize) {
                AsyncDispatcher.this.lastEventDetailsQueueSizeLogged = qSize;
                AsyncDispatcher.this.printEventDetailsExecutor.submit(this::printEventQueueDetails);
                AsyncDispatcher.this.printTrigger = true;
            }
            if ((remCapacity = AsyncDispatcher.this.eventQueue.remainingCapacity()) < 1000) {
                LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity);
            }
            try {
                AsyncDispatcher.this.eventQueue.put(event);
            }
            catch (InterruptedException e) {
                if (!AsyncDispatcher.this.stopped) {
                    LOG.warn("AsyncDispatcher thread interrupted", (Throwable)e);
                }
                AsyncDispatcher.this.drained = AsyncDispatcher.this.eventQueue.isEmpty();
                throw new YarnRuntimeException((Throwable)e);
            }
        }
    }

    static class MultiListenerHandler
    implements EventHandler<Event> {
        List<EventHandler<Event>> listofHandlers = new ArrayList<EventHandler<Event>>();

        @Override
        public void handle(Event event) {
            for (EventHandler<Event> handler : this.listofHandlers) {
                handler.handle(event);
            }
        }

        void addHandler(EventHandler<Event> handler) {
            this.listofHandlers.add(handler);
        }
    }
}

