/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog.events;

import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.impala.catalog.events.DbBarrierEvent;
import org.apache.impala.catalog.events.EventProcessException;
import org.apache.impala.catalog.events.MetastoreEvents;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.RenameTableBarrierEvent;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.util.ClassUtil;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableEventExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(TableEventExecutor.class);
    private final Map<String, TableProcessor> tableProcessors_ = new ConcurrentHashMap<String, TableProcessor>();
    private final String name_;
    private final MetastoreEventsProcessor eventProcessor_;
    private final long interval_;
    private final ScheduledExecutorService service_;
    private AtomicLong outstandingEventCount_ = new AtomicLong();

    TableEventExecutor(MetastoreEventsProcessor eventProcessor, String executorNamePrefix, String name, long interval) {
        Preconditions.checkArgument((eventProcessor != null ? 1 : 0) != 0);
        this.eventProcessor_ = eventProcessor;
        this.name_ = executorNamePrefix + ".TableEventExecutor-" + name;
        this.interval_ = interval;
        this.service_ = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(this.name_).build());
    }

    String getName() {
        return this.name_;
    }

    long getTableCount() {
        return this.tableProcessors_.size();
    }

    long getOutstandingEventCount() {
        return this.outstandingEventCount_.get();
    }

    private void incrOutstandingEventCount() {
        this.outstandingEventCount_.incrementAndGet();
    }

    private void decrOutstandingEventCount(long delta) {
        Preconditions.checkState((this.outstandingEventCount_.addAndGet(-delta) >= 0L ? 1 : 0) != 0, (Object)"outstandingEventCount is negative after decrement.");
    }

    void start() {
        Preconditions.checkNotNull((Object)this.service_);
        this.service_.scheduleAtFixedRate(this::process, this.interval_, this.interval_, TimeUnit.MILLISECONDS);
        LOG.debug("Started executor: {}", (Object)this.name_);
    }

    void clear() {
        Preconditions.checkState((boolean)this.tableProcessors_.isEmpty());
        Preconditions.checkState((this.outstandingEventCount_.get() == 0L ? 1 : 0) != 0, (Object)"outstandingEventCount is non-zero after clear.");
        LOG.debug("Cleared executor: {}", (Object)this.name_);
    }

    void stop() {
        MetastoreEventsProcessor.shutdownAndAwaitTermination(this.service_);
        LOG.debug("Stopped executor: {}", (Object)this.name_);
    }

    TableProcessor getOrCreateTableProcessor(String fqTableName) {
        TableProcessor tableProcessor = this.tableProcessors_.get(fqTableName);
        if (tableProcessor == null) {
            tableProcessor = new TableProcessor(this, fqTableName);
            this.tableProcessors_.put(fqTableName, tableProcessor);
        }
        return tableProcessor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void deleteTableProcessor(String fqTableName) {
        TableProcessor tableProcessor = this.tableProcessors_.remove(fqTableName);
        if (tableProcessor != null) {
            Object object = tableProcessor.processorLock_;
            synchronized (object) {
                tableProcessor.isTerminating_ = true;
            }
            this.decrOutstandingEventCount(tableProcessor.events_.size());
            tableProcessor.events_.clear();
        }
    }

    void process() {
        try {
            for (Map.Entry<String, TableProcessor> entry : this.tableProcessors_.entrySet()) {
                TableProcessor tableProcessor = entry.getValue();
                if (this.eventProcessor_.canProcessEventInCurrentStatus()) {
                    tableProcessor.process();
                    continue;
                }
                break;
            }
        }
        catch (EventProcessException e) {
            LOG.error("Exception occurred for executor: {}", (Object)this.name_);
            this.eventProcessor_.handleEventProcessException(e.getException(), e.getEvent());
        }
    }

    public static class TableProcessor {
        private final String fqTableName_;
        private long lastEventQueuedTime_;
        private long lastProcessedEventId_ = -1L;
        private final AtomicLong skipEventId_ = new AtomicLong(-1L);
        private final TableEventExecutor tableEventExecutor_;
        private final Object processorLock_ = new Object();
        private volatile boolean isTerminating_ = false;
        private final Queue<MetastoreEvents.MetastoreEvent> events_ = new ConcurrentLinkedQueue<MetastoreEvents.MetastoreEvent>();

        private TableProcessor(TableEventExecutor executor, String fqTableName) {
            this.tableEventExecutor_ = executor;
            this.fqTableName_ = fqTableName;
        }

        TableEventExecutor getTableEventExecutor() {
            return this.tableEventExecutor_;
        }

        String getTableName() {
            return this.fqTableName_;
        }

        boolean isEmpty() {
            return this.events_.isEmpty();
        }

        private MetastoreEventsProcessor getEventProcessor() {
            return this.tableEventExecutor_.eventProcessor_;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void enqueue(MetastoreEvents.MetastoreEvent event) {
            MetastoreEventsProcessor eventProcessor = this.getEventProcessor();
            if (!eventProcessor.canProcessEventInCurrentStatus()) {
                event.warnLog("Event is not queued to executor: {} since status is {}", new Object[]{this.tableEventExecutor_.name_, eventProcessor.getStatus()});
                return;
            }
            this.lastEventQueuedTime_ = System.currentTimeMillis();
            if (event instanceof DbBarrierEvent) {
                ((DbBarrierEvent)event).incrExpectedProceedCount();
            } else if (event.getEventType() == MetastoreEvents.MetastoreEventType.DROP_TABLE) {
                this.skipEventId_.set(event.getEventId());
            }
            Object object = this.processorLock_;
            synchronized (object) {
                Preconditions.checkState((!this.isTerminating() ? 1 : 0) != 0);
                this.events_.offer(event);
                this.tableEventExecutor_.incrOutstandingEventCount();
            }
            event.debugLog("Enqueued for table: {} on executor: {}", this.fqTableName_, this.tableEventExecutor_.name_);
        }

        boolean canBeRemoved() {
            return this.events_.isEmpty() && System.currentTimeMillis() - this.lastEventQueuedTime_ > (long)BackendConfig.INSTANCE.getMinEventProcessorIdleMs();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean isTerminating() {
            Object object = this.processorLock_;
            synchronized (object) {
                if (this.isTerminating_ && LOG.isDebugEnabled()) {
                    LOG.debug("Processor is terminating for table: {}. Caller stacktrace: {}", (Object)this.fqTableName_, (Object)ClassUtil.getStackTraceForThread());
                }
                return this.isTerminating_;
            }
        }

        private boolean skipEventIfPossible(MetastoreEvents.MetastoreEvent event) {
            if (event.getEventId() >= this.skipEventId_.get() || event instanceof DbBarrierEvent || event instanceof RenameTableBarrierEvent) {
                return false;
            }
            Counter eventSkipCounter = event.getMetrics().getCounter("events-skipped");
            eventSkipCounter.inc();
            event.debugLog("Incremented skipped metric to {}", eventSkipCounter.getCount());
            return true;
        }

        private void postProcessEvent(MetastoreEvents.MetastoreEvent event) {
            if (event instanceof DbBarrierEvent) {
                return;
            }
            this.lastProcessedEventId_ = event.getEventId();
            MetastoreEventsProcessor eventProcessor = this.getEventProcessor();
            boolean removeFromInProgressLog = true;
            if (event instanceof MetastoreEvents.DerivedMetastoreTableEvent) {
                MetastoreEvents.DerivedMetastoreTableEvent derivedEvent = (MetastoreEvents.DerivedMetastoreTableEvent)event;
                derivedEvent.markProcessed();
                removeFromInProgressLog = derivedEvent.isAllDerivedEventsProcessed();
            }
            if (removeFromInProgressLog) {
                eventProcessor.getEventExecutorService().removeFromInProgressLog(event.getEventId());
            }
            if (event.isDropEvent()) {
                eventProcessor.getDeleteEventLog().removeEvent(event.getEventId());
            }
        }

        private boolean processEvent(MetastoreEvents.MetastoreEvent event) throws Exception {
            if (this.isTerminating()) {
                return false;
            }
            MetastoreEventsProcessor eventProcessor = this.getEventProcessor();
            if (!eventProcessor.canProcessEventInCurrentStatus()) {
                LOG.warn("Event processing is skipped for executor: {} since status is {}", (Object)this.tableEventExecutor_.name_, (Object)eventProcessor.getStatus());
                return false;
            }
            boolean isRenameTableBarrier = event instanceof RenameTableBarrierEvent;
            if (isRenameTableBarrier && !((RenameTableBarrierEvent)event).canProcess()) {
                event.traceLog("Rename table barrier waiting for table: {}", this.fqTableName_);
                return false;
            }
            boolean isDbEvent = event instanceof DbBarrierEvent;
            if (isDbEvent && event.getEventId() != this.lastProcessedEventId_) {
                ((DbBarrierEvent)event).proceed();
                this.lastProcessedEventId_ = event.getEventId();
            }
            if (!this.skipEventIfPossible(event)) {
                if (isDbEvent) {
                    if (!((DbBarrierEvent)event).isAllDerivedEventsProcessed()) {
                        event.traceLog("DB barrier waiting for table: {}", this.fqTableName_);
                        return false;
                    }
                } else {
                    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(String.format("Processing %s for table: %s", event.getEventDesc(), this.fqTableName_));){
                        long processingStartTime = System.currentTimeMillis();
                        event.processIfEnabled();
                        event.infoLog("Scheduling delay: {}, Process time: {}", PrintUtils.printTimeMs(processingStartTime - event.getDispatchTime()), PrintUtils.printTimeMs(System.currentTimeMillis() - processingStartTime));
                    }
                    catch (Exception processingEx) {
                        try {
                            if (!event.onFailure(processingEx)) {
                                throw processingEx;
                            }
                        }
                        catch (Exception onFailureEx) {
                            event.errorLog("Failed to handle event processing failure for table: {}", this.fqTableName_, onFailureEx);
                            throw processingEx;
                        }
                    }
                }
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process() throws EventProcessException {
            MetastoreEvents.MetastoreEvent event;
            while ((event = this.events_.peek()) != null) {
                try {
                    boolean isProcessed = this.processEvent(event);
                    if (!isProcessed) {
                        return;
                    }
                    this.postProcessEvent(event);
                    Object object = this.processorLock_;
                    synchronized (object) {
                        if (this.isTerminating()) {
                            return;
                        }
                        Preconditions.checkState((this.events_.poll() == event ? 1 : 0) != 0);
                        this.tableEventExecutor_.decrOutstandingEventCount(1L);
                    }
                }
                catch (Exception e) {
                    throw new EventProcessException(event.getEvent(), e);
                }
            }
        }
    }
}

