/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog.events;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.impala.catalog.events.DbBarrierEvent;
import org.apache.impala.catalog.events.EventProcessException;
import org.apache.impala.catalog.events.MetastoreEvents;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.RenameTableBarrierEvent;
import org.apache.impala.catalog.events.TableEventExecutor;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.util.ClassUtil;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbEventExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(DbEventExecutor.class);
    private final Map<String, DbEventExecutor> dbNameToEventExecutor_;
    private final String name_;
    private final MetastoreEventsProcessor eventProcessor_;
    private final long interval_;
    private final ScheduledExecutorService service_;
    private final Map<String, DbProcessor> dbProcessors_ = new ConcurrentHashMap<String, DbProcessor>();
    private final List<TableEventExecutor> tableEventExecutors_;
    private final Map<String, TableEventExecutor> tableToEventExecutor_ = new ConcurrentHashMap<String, TableEventExecutor>();
    private AtomicLong outstandingEventCount_ = new AtomicLong();

    DbEventExecutor(MetastoreEventsProcessor eventProcessor, String name, long interval, int numTableEventExecutor, Map<String, DbEventExecutor> dbNameToEventExecutor) {
        Preconditions.checkArgument((eventProcessor != null ? 1 : 0) != 0);
        Preconditions.checkArgument((dbNameToEventExecutor != null ? 1 : 0) != 0);
        Preconditions.checkArgument((numTableEventExecutor > 0 ? 1 : 0) != 0);
        this.eventProcessor_ = eventProcessor;
        this.name_ = "DbEventExecutor-" + name;
        this.interval_ = interval;
        this.service_ = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(this.name_).build());
        this.tableEventExecutors_ = new ArrayList<TableEventExecutor>(numTableEventExecutor);
        for (int i = 0; i < numTableEventExecutor; ++i) {
            this.tableEventExecutors_.add(new TableEventExecutor(eventProcessor, this.name_, String.valueOf(i), this.interval_));
        }
        this.dbNameToEventExecutor_ = dbNameToEventExecutor;
    }

    @VisibleForTesting
    List<TableEventExecutor> getTableEventExecutors() {
        return this.tableEventExecutors_;
    }

    long getDbCount() {
        return this.dbProcessors_.size();
    }

    private void incrOutstandingEventCount() {
        this.outstandingEventCount_.incrementAndGet();
    }

    private void decrOutstandingEventCount(long delta) {
        Preconditions.checkState((this.outstandingEventCount_.addAndGet(-delta) >= 0L ? 1 : 0) != 0, (Object)"outstandingEventCount is negative after decrement.");
    }

    long getOutstandingEventCount() {
        long outstandingEventCount = this.outstandingEventCount_.get();
        return outstandingEventCount + this.tableEventExecutors_.stream().mapToLong(TableEventExecutor::getOutstandingEventCount).sum();
    }

    void start() {
        Preconditions.checkNotNull((Object)this.service_);
        this.service_.scheduleAtFixedRate(this::process, this.interval_, this.interval_, TimeUnit.MILLISECONDS);
        this.tableEventExecutors_.forEach(TableEventExecutor::start);
        LOG.debug("Started executor: {}", (Object)this.name_);
    }

    private void cleanIfNecessary(boolean force) {
        Iterator<Map.Entry<String, DbProcessor>> it = this.dbProcessors_.entrySet().iterator();
        while (it.hasNext()) {
            DbProcessor dbProcessor = it.next().getValue();
            if (force) {
                dbProcessor.clear();
            }
            if (!dbProcessor.canBeRemoved()) continue;
            this.unAssignEventExecutor(dbProcessor.dbName_);
            it.remove();
        }
    }

    void cleanup() {
        this.cleanIfNecessary(false);
    }

    private void clearInternal() {
        this.cleanIfNecessary(true);
        this.tableEventExecutors_.forEach(TableEventExecutor::clear);
        Preconditions.checkState((this.outstandingEventCount_.get() == 0L ? 1 : 0) != 0, (Object)"outstandingEventCount is non-zero after clear.");
    }

    void clear() {
        this.clearInternal();
        LOG.debug("Cleared executor: {}", (Object)this.name_);
    }

    void stop() {
        MetastoreEventsProcessor.shutdownAndAwaitTermination(this.service_);
        this.tableEventExecutors_.parallelStream().forEach(TableEventExecutor::stop);
        this.clearInternal();
        LOG.debug("Stopped executor: {}", (Object)this.name_);
    }

    void enqueue(MetastoreEvents.MetastoreEvent event) {
        if (!this.eventProcessor_.canProcessEventInCurrentStatus()) {
            event.warnLog("Event is not queued to executor: {} since status is {}", new Object[]{this.name_, this.eventProcessor_.getStatus()});
            return;
        }
        String dbName = event.getDbName().toLowerCase();
        DbProcessor dbProcessor = this.dbProcessors_.get(dbName);
        if (dbProcessor == null) {
            dbProcessor = new DbProcessor(this, dbName);
            this.dbProcessors_.put(dbName, dbProcessor);
            this.assignEventExecutor(dbName, this);
        }
        dbProcessor.enqueue(event);
    }

    @VisibleForTesting
    TableEventExecutor getTableEventExecutor(String fqTableName) {
        Preconditions.checkNotNull((Object)fqTableName);
        return this.tableToEventExecutor_.get(fqTableName);
    }

    private TableEventExecutor getOrAssignTableEventExecutor(String fqTableName) {
        Preconditions.checkNotNull((Object)fqTableName);
        TableEventExecutor executor = this.tableToEventExecutor_.get(fqTableName);
        if (executor != null) {
            return executor;
        }
        long minOutStandingEvents = Long.MAX_VALUE;
        long minTableCount = Long.MAX_VALUE;
        for (TableEventExecutor tee : this.tableEventExecutors_) {
            long outstandingEventCount;
            long tableCount = tee.getTableCount();
            if (tableCount < minTableCount) {
                minTableCount = tableCount;
                minOutStandingEvents = tee.getOutstandingEventCount();
                executor = tee;
                continue;
            }
            if (tableCount != minTableCount || (outstandingEventCount = tee.getOutstandingEventCount()) >= minOutStandingEvents) continue;
            minOutStandingEvents = outstandingEventCount;
            executor = tee;
        }
        Preconditions.checkNotNull((Object)executor);
        this.tableToEventExecutor_.put(fqTableName, executor);
        LOG.info("Assigned executor: {} for table: {}", (Object)executor.getName(), (Object)fqTableName);
        return executor;
    }

    private void unAssignTableEventExecutor(String fqTableName) {
        Preconditions.checkNotNull((Object)fqTableName);
        TableEventExecutor executor = this.tableToEventExecutor_.remove(fqTableName);
        String executorName = executor != null ? executor.getName() : "";
        LOG.info("Unassigned executor: {} for table: {}", (Object)executorName, (Object)fqTableName);
    }

    void process() {
        try {
            for (Map.Entry<String, DbProcessor> entry : this.dbProcessors_.entrySet()) {
                DbProcessor dbProcessor = entry.getValue();
                if (this.eventProcessor_.canProcessEventInCurrentStatus()) {
                    dbProcessor.process();
                    continue;
                }
                break;
            }
        }
        catch (EventProcessException e) {
            LOG.error("Exception occurred for executor: {}", (Object)this.name_);
            this.eventProcessor_.handleEventProcessException(e.getException(), e.getEvent());
        }
    }

    private void assignEventExecutor(String dbName, DbEventExecutor executor) {
        Preconditions.checkNotNull((Object)dbName);
        Preconditions.checkNotNull((Object)executor);
        Preconditions.checkState((this.dbNameToEventExecutor_.get(dbName) == null ? 1 : 0) != 0);
        this.dbNameToEventExecutor_.put(dbName, executor);
        LOG.info("Assigned executor: {} for db: {}", (Object)executor.name_, (Object)dbName);
    }

    private void unAssignEventExecutor(String dbName) {
        Preconditions.checkNotNull((Object)dbName);
        DbEventExecutor executor = this.dbNameToEventExecutor_.remove(dbName);
        String executorName = executor != null ? executor.name_ : "";
        LOG.info("Unassigned executor: {} for db: {}", (Object)executorName, (Object)dbName);
    }

    public static class DbProcessor {
        private final String dbName_;
        private long lastEventQueuedTime_;
        private final AtomicLong skipEventId_ = new AtomicLong(-1L);
        private final DbEventExecutor dbEventExecutor_;
        private final Queue<MetastoreEvents.MetastoreEvent> inEvents_ = new ConcurrentLinkedQueue<MetastoreEvents.MetastoreEvent>();
        private final Queue<DbBarrierEvent> barrierEvents_ = new ConcurrentLinkedQueue<DbBarrierEvent>();
        private final Set<TableEventExecutor.TableProcessor> tableProcessors_ = ConcurrentHashMap.newKeySet();
        private final Object processorLock_ = new Object();
        private boolean isTerminating_ = false;

        private DbProcessor(DbEventExecutor executor, String dbName) {
            this.dbEventExecutor_ = executor;
            this.dbName_ = dbName;
        }

        private MetastoreEventsProcessor getEventProcessor() {
            return this.dbEventExecutor_.eventProcessor_;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void enqueue(MetastoreEvents.MetastoreEvent event) {
            this.lastEventQueuedTime_ = System.currentTimeMillis();
            if (event.getEventType() == MetastoreEvents.MetastoreEventType.DROP_DATABASE) {
                this.skipEventId_.set(event.getEventId());
            }
            Object object = this.processorLock_;
            synchronized (object) {
                Preconditions.checkState((!this.isTerminating() ? 1 : 0) != 0);
                this.inEvents_.offer(event);
                this.dbEventExecutor_.incrOutstandingEventCount();
            }
            event.debugLog("Enqueued for db: {} on executor: {}", this.dbName_, this.dbEventExecutor_.name_);
        }

        private boolean canBeRemoved() {
            return this.inEvents_.isEmpty() && this.barrierEvents_.isEmpty() && this.tableProcessors_.isEmpty() && System.currentTimeMillis() - this.lastEventQueuedTime_ > (long)BackendConfig.INSTANCE.getMinEventProcessorIdleMs();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatchTableEvent(MetastoreEvents.MetastoreEvent event) {
            String fqTableName = (event.getDbName() + '.' + event.getTableName()).toLowerCase();
            Object object = this.processorLock_;
            synchronized (object) {
                if (this.isTerminating()) {
                    return;
                }
                TableEventExecutor tableEventExecutor = this.dbEventExecutor_.getOrAssignTableEventExecutor(fqTableName);
                TableEventExecutor.TableProcessor tableProcessor = tableEventExecutor.getOrCreateTableProcessor(fqTableName);
                this.tableProcessors_.add(tableProcessor);
                if (tableProcessor.isEmpty()) {
                    this.barrierEvents_.forEach(tableProcessor::enqueue);
                }
                tableProcessor.enqueue(event);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatchDbEvent(MetastoreEvents.MetastoreDatabaseEvent event) {
            Object object = this.processorLock_;
            synchronized (object) {
                if (this.isTerminating()) {
                    return;
                }
                DbBarrierEvent barrierEvent = new DbBarrierEvent(event);
                this.tableProcessors_.forEach(tableProcessor -> {
                    if (!tableProcessor.isEmpty()) {
                        tableProcessor.enqueue(barrierEvent);
                    }
                });
                this.barrierEvents_.offer(barrierEvent);
                this.dbEventExecutor_.incrOutstandingEventCount();
            }
        }

        private void postProcessEvent(DbBarrierEvent event) {
            MetastoreEventsProcessor eventProcessor = this.getEventProcessor();
            event.markProcessed();
            eventProcessor.getEventExecutorService().removeFromInProgressLog(event.getEventId());
            if (event.getEventType() == MetastoreEvents.MetastoreEventType.DROP_DATABASE) {
                eventProcessor.getDeleteEventLog().removeEvent(event.getEventId());
            }
        }

        private boolean processDbEvent(DbBarrierEvent barrierEvent) throws Exception {
            if (this.isTerminating()) {
                return false;
            }
            if (barrierEvent.getExpectedProceedCount() != 0) {
                return false;
            }
            try (ThreadNameAnnotator tna = new ThreadNameAnnotator(String.format("Processing %s for db: %s", barrierEvent.getEventDesc(), this.dbName_));){
                long processingStartTime = System.currentTimeMillis();
                barrierEvent.processIfEnabled();
                barrierEvent.infoLog("Scheduling delay: {}, Process time: {}", PrintUtils.printTimeMs(processingStartTime - barrierEvent.getDispatchTime()), PrintUtils.printTimeMs(System.currentTimeMillis() - processingStartTime));
            }
            catch (Exception processingEx) {
                try {
                    if (!barrierEvent.onFailure(processingEx)) {
                        throw processingEx;
                    }
                }
                catch (Exception onFailureEx) {
                    barrierEvent.errorLog("Failed to handle event processing failure for db: {}", this.dbName_, onFailureEx);
                    throw processingEx;
                }
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processDbEvents() throws EventProcessException {
            DbBarrierEvent barrierEvent;
            while ((barrierEvent = this.barrierEvents_.peek()) != null) {
                try {
                    boolean isProcessed = this.processDbEvent(barrierEvent);
                    if (!isProcessed) {
                        return;
                    }
                    this.postProcessEvent(barrierEvent);
                    Object object = this.processorLock_;
                    synchronized (object) {
                        if (this.isTerminating()) {
                            return;
                        }
                        Preconditions.checkState((this.barrierEvents_.poll() == barrierEvent ? 1 : 0) != 0);
                        this.dbEventExecutor_.decrOutstandingEventCount(1L);
                    }
                }
                catch (Exception e) {
                    throw new EventProcessException(barrierEvent.getEvent(), e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void clear() {
            Object object = this.processorLock_;
            synchronized (object) {
                this.isTerminating_ = true;
            }
            this.dbEventExecutor_.decrOutstandingEventCount(this.inEvents_.size());
            this.dbEventExecutor_.decrOutstandingEventCount(this.barrierEvents_.size());
            this.inEvents_.clear();
            this.barrierEvents_.clear();
            this.lastEventQueuedTime_ = 0L;
            this.cleanIfNecessary(true);
            Preconditions.checkState((boolean)this.tableProcessors_.isEmpty());
        }

        private void cleanIfNecessary(boolean force) {
            Iterator<TableEventExecutor.TableProcessor> it = this.tableProcessors_.iterator();
            while (it.hasNext()) {
                TableEventExecutor.TableProcessor tableProcessor = it.next();
                if (!force && !tableProcessor.canBeRemoved()) continue;
                TableEventExecutor tableEventExecutor = tableProcessor.getTableEventExecutor();
                tableEventExecutor.deleteTableProcessor(tableProcessor.getTableName());
                this.dbEventExecutor_.unAssignTableEventExecutor(tableProcessor.getTableName());
                it.remove();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean isTerminating() {
            Object object = this.processorLock_;
            synchronized (object) {
                if (this.isTerminating_ && LOG.isDebugEnabled()) {
                    LOG.debug("Processor is terminating for db: {}. Caller stacktrace: {}", (Object)this.dbName_, (Object)ClassUtil.getStackTraceForThread());
                }
                return this.isTerminating_;
            }
        }

        private boolean skipEventIfPossible(MetastoreEvents.MetastoreEvent event) {
            if (event.getEventId() >= this.skipEventId_.get() || event instanceof MetastoreEvents.DropTableEvent || event instanceof RenameTableBarrierEvent) {
                return false;
            }
            Counter eventSkipCounter = event.getMetrics().getCounter("events-skipped");
            eventSkipCounter.inc();
            event.debugLog("Incremented skipped metric to {}", eventSkipCounter.getCount());
            MetastoreEventsProcessor eventProcessor = this.getEventProcessor();
            eventProcessor.getEventExecutorService().removeFromInProgressLog(event.getEventId());
            if (event.isDropEvent()) {
                eventProcessor.getDeleteEventLog().removeEvent(event.getEventId());
            }
            return true;
        }

        private boolean dispatchEvent(MetastoreEvents.MetastoreEvent event) {
            if (this.isTerminating()) {
                return false;
            }
            MetastoreEventsProcessor eventProcessor = this.getEventProcessor();
            if (!eventProcessor.canProcessEventInCurrentStatus()) {
                LOG.warn("Event processing is skipped for executor: {} since status is {}", (Object)this.dbEventExecutor_.name_, (Object)eventProcessor.getStatus());
                return false;
            }
            if (!this.skipEventIfPossible(event)) {
                if (event.isDatabaseEvent()) {
                    this.dispatchDbEvent((MetastoreEvents.MetastoreDatabaseEvent)event);
                } else {
                    this.dispatchTableEvent(event);
                }
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process() throws EventProcessException {
            MetastoreEvents.MetastoreEvent event;
            while ((event = this.inEvents_.peek()) != null) {
                try {
                    boolean isDispatched = this.dispatchEvent(event);
                    if (!isDispatched) {
                        return;
                    }
                    Object object = this.processorLock_;
                    synchronized (object) {
                        if (this.isTerminating()) {
                            return;
                        }
                        Preconditions.checkState((this.inEvents_.poll() == event ? 1 : 0) != 0);
                        this.dbEventExecutor_.decrOutstandingEventCount(1L);
                    }
                }
                catch (Exception e) {
                    throw new EventProcessException(event.getEvent(), e);
                }
            }
            this.processDbEvents();
            this.cleanIfNecessary(false);
        }
    }
}

