/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.common;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.tez.common.AsyncDispatcher;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezAbstractEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class AsyncDispatcherConcurrent
extends CompositeService
implements Dispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class);
    private final String name;
    private final ArrayList<LinkedBlockingQueue<Event>> eventQueues;
    private volatile boolean stopped = false;
    private volatile boolean drainEventsOnStop = false;
    private volatile boolean drained = true;
    private Object waitForDrained = new Object();
    private volatile boolean blockNewEvents = false;
    private EventHandler handlerInstance = new GenericEventHandler();
    private ExecutorService execService;
    private final int numThreads;
    protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
    protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers = Maps.newHashMap();
    private boolean exitOnDispatchException = false;

    AsyncDispatcherConcurrent(String name, int numThreads) {
        super(name);
        Preconditions.checkArgument(numThreads > 0);
        this.name = name;
        this.eventQueues = Lists.newArrayListWithCapacity((int)numThreads);
        this.numThreads = numThreads;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        int i;
        this.execService = Executors.newFixedThreadPool(this.numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Dispatcher {" + this.name + "} #%d").build());
        for (i = 0; i < this.numThreads; ++i) {
            this.eventQueues.add(new LinkedBlockingQueue());
        }
        for (i = 0; i < this.numThreads; ++i) {
            this.execService.execute(new DispatchRunner(this.eventQueues.get(i)));
        }
        super.serviceStart();
    }

    public void setDrainEventsOnStop() {
        this.drainEventsOnStop = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void serviceStop() throws Exception {
        if (this.execService != null) {
            if (this.drainEventsOnStop) {
                this.blockNewEvents = true;
                LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
                Object object = this.waitForDrained;
                synchronized (object) {
                    while (!this.drained && !this.execService.isShutdown()) {
                        LOG.info("Waiting for AsyncDispatcher to drain.");
                        this.waitForDrained.wait(1000L);
                    }
                }
            }
            this.stopped = true;
            for (int i = 0; i < this.numThreads; ++i) {
                LOG.info("AsyncDispatcher stopping with events: " + this.eventQueues.get(i).size() + " in queue: " + i);
            }
            this.execService.shutdownNow();
        }
        super.serviceStop();
    }

    protected void dispatch(Event event) {
        block4: {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString());
            }
            Class type = event.getType().getDeclaringClass();
            try {
                EventHandler handler = this.eventHandlers.get(type);
                if (handler == null) {
                    throw new Exception("No handler for registered for " + type);
                }
                handler.handle(event);
            }
            catch (Throwable t) {
                LOG.error("Error in dispatcher thread", t);
                if (!this.exitOnDispatchException || ShutdownHookManager.get().isShutdownInProgress() || this.stopped) break block4;
                Thread shutDownThread = new Thread(this.createShutDownThread());
                shutDownThread.setName("AsyncDispatcher ShutDown handler");
                shutDownThread.start();
            }
        }
    }

    private void checkForExistingHandler(Class<? extends Enum> eventType) {
        EventHandler registeredHandler = this.eventHandlers.get(eventType);
        Preconditions.checkState(registeredHandler == null, "Cannot register same event on multiple dispatchers");
    }

    private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
        AsyncDispatcherConcurrent registeredDispatcher = this.eventDispatchers.get(eventType);
        Preconditions.checkState(registeredDispatcher == null, "Multiple dispatchers cannot be registered for: " + eventType.getName());
    }

    private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
        if (checkHandler) {
            this.checkForExistingHandler(eventType);
        }
        this.checkForExistingDispatcher(eventType);
    }

    public void register(Class<? extends Enum> eventType, EventHandler handler) {
        Preconditions.checkState(this.getServiceState() == Service.STATE.NOTINITED);
        EventHandler registeredHandler = this.eventHandlers.get(eventType);
        this.checkForExistingDispatchers(false, eventType);
        LOG.info("Registering " + eventType + " for " + handler.getClass());
        if (registeredHandler == null) {
            this.eventHandlers.put(eventType, handler);
        } else if (!(registeredHandler instanceof MultiListenerHandler)) {
            MultiListenerHandler multiHandler = new MultiListenerHandler();
            multiHandler.addHandler((EventHandler<Event>)registeredHandler);
            multiHandler.addHandler((EventHandler<Event>)handler);
            this.eventHandlers.put(eventType, multiHandler);
        } else {
            MultiListenerHandler multiHandler = (MultiListenerHandler)registeredHandler;
            multiHandler.addHandler((EventHandler<Event>)handler);
        }
    }

    public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType, EventHandler handler, String dispatcherName, int numThreads) {
        Preconditions.checkState(this.getServiceState() == Service.STATE.NOTINITED);
        this.checkForExistingDispatchers(true, eventType);
        LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
        AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
        dispatcher.register(eventType, handler);
        this.eventDispatchers.put(eventType, dispatcher);
        this.addIfService((Object)dispatcher);
        return dispatcher;
    }

    public void registerWithExistingDispatcher(Class<? extends Enum> eventType, EventHandler handler, AsyncDispatcherConcurrent dispatcher) {
        Preconditions.checkState(this.getServiceState() == Service.STATE.NOTINITED);
        this.checkForExistingDispatchers(true, eventType);
        LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: " + handler.getClass());
        dispatcher.register(eventType, handler);
        this.eventDispatchers.put(eventType, dispatcher);
    }

    @VisibleForTesting
    public void enableExitOnDispatchException() {
        this.exitOnDispatchException = true;
    }

    public EventHandler getEventHandler() {
        return this.handlerInstance;
    }

    Runnable createShutDownThread() {
        return new Runnable(){

            @Override
            public void run() {
                LOG.info("Exiting, bbye..");
                System.exit(-1);
            }
        };
    }

    class GenericEventHandler
    implements EventHandler<TezAbstractEvent> {
        GenericEventHandler() {
        }

        public void handle(TezAbstractEvent event) {
            int remCapacity;
            if (AsyncDispatcherConcurrent.this.stopped) {
                return;
            }
            if (AsyncDispatcherConcurrent.this.blockNewEvents) {
                return;
            }
            AsyncDispatcherConcurrent.this.drained = false;
            Class type = event.getType().getDeclaringClass();
            AsyncDispatcherConcurrent registeredDispatcher = AsyncDispatcherConcurrent.this.eventDispatchers.get(type);
            if (registeredDispatcher != null) {
                registeredDispatcher.getEventHandler().handle((Event)event);
                return;
            }
            int index = AsyncDispatcherConcurrent.this.numThreads > 1 ? event.getSerializingHash() % AsyncDispatcherConcurrent.this.numThreads : 0;
            LinkedBlockingQueue queue = (LinkedBlockingQueue)AsyncDispatcherConcurrent.this.eventQueues.get(index);
            int qSize = queue.size();
            if (qSize != 0 && qSize % 1000 == 0) {
                LOG.info("Size of event-queue is " + qSize);
            }
            if ((remCapacity = queue.remainingCapacity()) < 1000) {
                LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity);
            }
            try {
                queue.put(event);
            }
            catch (InterruptedException e) {
                if (!AsyncDispatcherConcurrent.this.stopped) {
                    LOG.warn("AsyncDispatcher thread interrupted", (Throwable)e);
                }
                throw new YarnRuntimeException((Throwable)e);
            }
        }
    }

    class DispatchRunner
    implements Runnable {
        final LinkedBlockingQueue<Event> queue;

        public DispatchRunner(LinkedBlockingQueue<Event> queue) {
            this.queue = queue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!AsyncDispatcherConcurrent.this.stopped && !Thread.currentThread().isInterrupted()) {
                Event event;
                AsyncDispatcherConcurrent.this.drained = this.queue.isEmpty();
                if (AsyncDispatcherConcurrent.this.blockNewEvents) {
                    Object object = AsyncDispatcherConcurrent.this.waitForDrained;
                    synchronized (object) {
                        if (AsyncDispatcherConcurrent.this.drained) {
                            AsyncDispatcherConcurrent.this.waitForDrained.notify();
                        }
                    }
                }
                try {
                    event = this.queue.take();
                }
                catch (InterruptedException ie) {
                    if (!AsyncDispatcherConcurrent.this.stopped) {
                        LOG.warn("AsyncDispatcher thread interrupted", (Throwable)ie);
                    }
                    return;
                }
                if (event == null) continue;
                AsyncDispatcherConcurrent.this.dispatch(event);
            }
        }
    }

    static class MultiListenerHandler
    implements EventHandler<Event> {
        List<EventHandler<Event>> listofHandlers = new ArrayList<EventHandler<Event>>();

        public void handle(Event event) {
            for (EventHandler<Event> handler : this.listofHandlers) {
                handler.handle(event);
            }
        }

        void addHandler(EventHandler<Event> handler) {
            this.listofHandlers.add(handler);
        }
    }
}

