/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.TableWriteId;
import org.apache.impala.catalog.events.DbEventExecutor;
import org.apache.impala.catalog.events.MetastoreEvents;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.events.RenameTableBarrierEvent;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.compat.MetastoreShim;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventExecutorService {
    private static final Logger LOG = LoggerFactory.getLogger(EventExecutorService.class);
    private static final int EXECUTOR_SCHEDULE_INTERVAL_MS = 10;
    private EventExecutorStatus status_ = EventExecutorStatus.INACTIVE;
    private final MetastoreEventsProcessor eventProcessor_;
    private final List<DbEventExecutor> dbEventExecutors_;
    private final TreeMap<Long, MetastoreEvents.MetastoreEvent> inProgressLog_ = new TreeMap();
    private TreeMap<Long, Long> processedLog_ = new TreeMap();
    private final Map<String, DbEventExecutor> dbNameToEventExecutor_ = new ConcurrentHashMap<String, DbEventExecutor>();

    EventExecutorService(MetastoreEventsProcessor eventProcessor, int numDbEventExecutor, int numTableEventExecutor) {
        Preconditions.checkArgument((eventProcessor != null ? 1 : 0) != 0);
        Preconditions.checkArgument((numDbEventExecutor > 0 ? 1 : 0) != 0);
        this.eventProcessor_ = eventProcessor;
        this.dbEventExecutors_ = new ArrayList<DbEventExecutor>(numDbEventExecutor);
        for (int i = 0; i < numDbEventExecutor; ++i) {
            this.dbEventExecutors_.add(new DbEventExecutor(eventProcessor, String.valueOf(i), 10L, numTableEventExecutor, this.dbNameToEventExecutor_));
        }
    }

    @VisibleForTesting
    List<DbEventExecutor> getDbEventExecutors() {
        return this.dbEventExecutors_;
    }

    @VisibleForTesting
    void setStatus(EventExecutorStatus status) {
        Preconditions.checkState((this.status_ != EventExecutorStatus.STOPPED ? 1 : 0) != 0);
        if (status == EventExecutorStatus.ACTIVE) {
            this.makeActive();
        } else if (status == EventExecutorStatus.STOPPED) {
            this.shutdown(true);
        }
    }

    @VisibleForTesting
    TreeMap<Long, MetastoreEvents.MetastoreEvent> getInProgressLog() {
        return this.inProgressLog_;
    }

    @VisibleForTesting
    TreeMap<Long, Long> getProcessedLog() {
        return this.processedLog_;
    }

    private synchronized void makeActive() {
        this.clearLogs();
        this.addToProcessedLog(this.eventProcessor_.getLastSyncedEventId(), this.eventProcessor_.getLastSyncedEventTime());
        this.status_ = EventExecutorStatus.ACTIVE;
    }

    synchronized void start() {
        Preconditions.checkState((this.status_ != EventExecutorStatus.STOPPED ? 1 : 0) != 0);
        if (this.status_ == EventExecutorStatus.INACTIVE) {
            this.dbEventExecutors_.forEach(DbEventExecutor::start);
        }
        this.makeActive();
    }

    synchronized void clear() {
        this.dbEventExecutors_.parallelStream().forEach(DbEventExecutor::clear);
        this.clearLogs();
    }

    synchronized void shutdown(boolean graceful) {
        Preconditions.checkState((this.status_ != EventExecutorStatus.STOPPED ? 1 : 0) != 0);
        this.status_ = EventExecutorStatus.STOPPED;
        if (graceful) {
            this.eventProcessor_.ensureEventsProcessedInHierarchicalMode(3600000);
        }
        this.dbEventExecutors_.parallelStream().forEach(DbEventExecutor::stop);
        this.clearLogs();
    }

    private synchronized void clearLogs() {
        this.inProgressLog_.clear();
        this.processedLog_.clear();
    }

    synchronized void cleanup() {
        this.dbEventExecutors_.parallelStream().forEach(DbEventExecutor::cleanup);
    }

    synchronized long getOutstandingEventCount() {
        return this.inProgressLog_.size();
    }

    synchronized long getPendingEventCount(long eventId) {
        Preconditions.checkState((this.status_ == EventExecutorStatus.ACTIVE ? 1 : 0) != 0, (Object)"EventExecutorService must be active");
        long greatestSyncedEventId = this.processedLog_.firstKey();
        if (eventId <= greatestSyncedEventId) {
            return 0L;
        }
        if (this.inProgressLog_.isEmpty()) {
            return eventId - greatestSyncedEventId;
        }
        long lastUnprocessedEventId = this.inProgressLog_.lastKey();
        if (eventId <= lastUnprocessedEventId) {
            return this.inProgressLog_.headMap(eventId, true).size();
        }
        long closerEventId = this.processedLog_.floorKey(eventId);
        if (closerEventId == greatestSyncedEventId) {
            return eventId - lastUnprocessedEventId + (long)this.inProgressLog_.size();
        }
        return eventId - closerEventId + (long)this.inProgressLog_.headMap(closerEventId).size();
    }

    synchronized void dispatch(MetastoreEvents.MetastoreEvent event) throws MetastoreNotificationException {
        Preconditions.checkState((this.status_ == EventExecutorStatus.ACTIVE ? 1 : 0) != 0, (Object)"EventExecutorService must be active");
        Preconditions.checkNotNull((Object)event);
        boolean isEventDispatched = false;
        if (event instanceof MetastoreShim.CommitTxnEvent) {
            isEventDispatched = this.processCommitTxnEvent((MetastoreShim.CommitTxnEvent)event);
        } else if (event instanceof MetastoreEvents.AbortTxnEvent) {
            isEventDispatched = this.processAbortTxnEvent((MetastoreEvents.AbortTxnEvent)event);
        } else if (event instanceof MetastoreEvents.AlterTableEvent && ((MetastoreEvents.AlterTableEvent)event).isRename()) {
            this.processAlterTableRenameEvent((MetastoreEvents.AlterTableEvent)event);
            isEventDispatched = true;
        } else if (event instanceof MetastoreEvents.IgnoredEvent) {
            event.debugLog("Ignoring event type {}", event.getEvent().getEventType());
        } else {
            DbEventExecutor dbEventExecutor = this.getOrFindDbEventExecutor(event.getDbName());
            dbEventExecutor.enqueue(event);
            isEventDispatched = true;
        }
        this.addToInProgressLog(event, !isEventDispatched);
    }

    private boolean processCommitTxnEvent(MetastoreShim.CommitTxnEvent commitTxnEvent) throws MetastoreNotificationException {
        List<MetastoreShim.PseudoCommitTxnEvent> pseudoCommitTxnEvents = MetastoreShim.getPseudoCommitTxnEvents(commitTxnEvent);
        for (MetastoreShim.PseudoCommitTxnEvent pseudoCommitTxnEvent : pseudoCommitTxnEvents) {
            DbEventExecutor dbEventExecutor = this.getOrFindDbEventExecutor(pseudoCommitTxnEvent.getDbName());
            dbEventExecutor.enqueue(pseudoCommitTxnEvent);
        }
        return !pseudoCommitTxnEvents.isEmpty();
    }

    private boolean processAbortTxnEvent(MetastoreEvents.AbortTxnEvent abortTxnEvent) {
        Set<TableWriteId> tableWriteIds = abortTxnEvent.getTableWriteIds();
        HashMap<TableName, List> tableWriteIdsMap = new HashMap<TableName, List>();
        for (TableWriteId tableWriteId : tableWriteIds) {
            tableWriteIdsMap.computeIfAbsent(new TableName(tableWriteId.getDbName(), tableWriteId.getTblName()), k -> new ArrayList()).add(tableWriteId.getWriteId());
        }
        MetastoreEvents.DerivedMetastoreEventContext context = new MetastoreEvents.DerivedMetastoreEventContext(abortTxnEvent, tableWriteIdsMap.size());
        for (Map.Entry entry : tableWriteIdsMap.entrySet()) {
            DbEventExecutor dbEventExecutor = this.getOrFindDbEventExecutor(((TableName)entry.getKey()).getDb());
            dbEventExecutor.enqueue(new MetastoreEvents.PseudoAbortTxnEvent(context, ((TableName)entry.getKey()).getDb(), ((TableName)entry.getKey()).getTbl(), (List)entry.getValue()));
        }
        return !tableWriteIdsMap.isEmpty();
    }

    @VisibleForTesting
    List<RenameTableBarrierEvent> getRenameTableBarrierEvents(MetastoreEvents.AlterTableEvent alterEvent) {
        ArrayList<RenameTableBarrierEvent> barrierEvents = new ArrayList<RenameTableBarrierEvent>();
        NotificationEvent pseudoDropTableEvent = new NotificationEvent();
        pseudoDropTableEvent.setEventType(MetastoreEvents.MetastoreEventType.DROP_TABLE.toString());
        pseudoDropTableEvent.setTableName(alterEvent.getBeforeTable().getTableName());
        pseudoDropTableEvent.setDbName(alterEvent.getBeforeTable().getDbName());
        pseudoDropTableEvent.setCatName(alterEvent.getCatalogName());
        pseudoDropTableEvent.setEventId(alterEvent.getEventId());
        MetastoreEvents.DropTableEvent dropTableEvent = new MetastoreEvents.DropTableEvent(alterEvent.getCatalogOpExecutor(), alterEvent.getMetrics(), pseudoDropTableEvent, alterEvent.getBeforeTable());
        NotificationEvent pseudoCreateTableEvent = new NotificationEvent();
        pseudoCreateTableEvent.setEventType(MetastoreEvents.MetastoreEventType.CREATE_TABLE.toString());
        pseudoCreateTableEvent.setTableName(alterEvent.getAfterTable().getTableName());
        pseudoCreateTableEvent.setDbName(alterEvent.getAfterTable().getDbName());
        pseudoCreateTableEvent.setCatName(alterEvent.getCatalogName());
        pseudoCreateTableEvent.setEventId(alterEvent.getEventId());
        MetastoreEvents.CreateTableEvent createTableEvent = new MetastoreEvents.CreateTableEvent(alterEvent.getCatalogOpExecutor(), alterEvent.getMetrics(), pseudoCreateTableEvent, alterEvent.getAfterTable());
        RenameTableBarrierEvent.RenameEventState state = new RenameTableBarrierEvent.RenameEventState();
        MetastoreEvents.DerivedMetastoreEventContext context = new MetastoreEvents.DerivedMetastoreEventContext(alterEvent, 2);
        barrierEvents.add(new RenameTableBarrierEvent(context, dropTableEvent, state));
        barrierEvents.add(new RenameTableBarrierEvent(context, createTableEvent, state));
        return barrierEvents;
    }

    private void processAlterTableRenameEvent(MetastoreEvents.AlterTableEvent alterEvent) {
        for (RenameTableBarrierEvent event : this.getRenameTableBarrierEvents(alterEvent)) {
            DbEventExecutor dbEventExecutor = this.getOrFindDbEventExecutor(event.getDbName());
            dbEventExecutor.enqueue(event);
        }
    }

    @Nullable
    DbEventExecutor getDbEventExecutor(String dbName) {
        Preconditions.checkNotNull((Object)dbName);
        return this.dbNameToEventExecutor_.get(dbName);
    }

    private DbEventExecutor getOrFindDbEventExecutor(String dbName) {
        Preconditions.checkNotNull((Object)dbName);
        DbEventExecutor eventExecutor = this.getDbEventExecutor(dbName.toLowerCase());
        if (eventExecutor == null) {
            long minOutStandingEvents = Long.MAX_VALUE;
            long minDbCount = Long.MAX_VALUE;
            for (DbEventExecutor dee : this.dbEventExecutors_) {
                long outstandingEventCount;
                long dbCount = dee.getDbCount();
                if (dbCount < minDbCount) {
                    minDbCount = dbCount;
                    minOutStandingEvents = dee.getOutstandingEventCount();
                    eventExecutor = dee;
                    continue;
                }
                if (dbCount != minDbCount || (outstandingEventCount = dee.getOutstandingEventCount()) >= minOutStandingEvents) continue;
                minOutStandingEvents = outstandingEventCount;
                eventExecutor = dee;
            }
        }
        return eventExecutor;
    }

    synchronized void addToProcessedLog(long eventId, long eventTime) {
        this.processedLog_.put(eventId, eventTime);
        if (this.processedLog_.size() > 1) {
            this.pruneProcessedLog();
        }
    }

    private synchronized void addToInProgressLog(MetastoreEvents.MetastoreEvent event, boolean isDelimiter) {
        event.setDelimiter(isDelimiter);
        long currentTime = System.currentTimeMillis();
        event.debugLog("Dispatch time: {}", PrintUtils.printTimeMs(currentTime - event.getCreationTime()));
        if (this.inProgressLog_.isEmpty() && isDelimiter) {
            this.addToProcessedLog(event.getEventId(), event.getEvent().getEventTime());
            return;
        }
        event.setDispatchTime(currentTime);
        this.inProgressLog_.put(event.getEventId(), event);
    }

    synchronized void removeFromInProgressLog(long eventId) {
        Map.Entry<Long, MetastoreEvents.MetastoreEvent> entry;
        long currentTime = System.currentTimeMillis();
        MetastoreEvents.MetastoreEvent event = this.inProgressLog_.remove(eventId);
        if (event == null) {
            return;
        }
        event.debugLog("Complete process time: {}", PrintUtils.printTimeMs(currentTime - event.getDispatchTime()));
        this.processedLog_.put(eventId, Long.valueOf(event.getEvent().getEventTime()));
        Preconditions.checkState((!event.isDelimiter() ? 1 : 0) != 0);
        Iterator<Map.Entry<Long, MetastoreEvents.MetastoreEvent>> it = this.inProgressLog_.tailMap(eventId).entrySet().iterator();
        while (it.hasNext() && (event = (entry = it.next()).getValue()).isDelimiter()) {
            this.processedLog_.put(entry.getKey(), Long.valueOf(event.getEvent().getEventTime()));
            it.remove();
        }
        LOG.debug("Current count of dispatched events that are being tracked: {} ", (Object)this.inProgressLog_.size());
        this.pruneProcessedLog();
    }

    private synchronized void pruneProcessedLog() {
        long newGreatestSyncedEventId = this.processedLog_.lastKey();
        if (!this.inProgressLog_.isEmpty()) {
            Long firstUnprocessedEventId = this.inProgressLog_.firstKey();
            Preconditions.checkState((firstUnprocessedEventId > this.processedLog_.firstKey() ? 1 : 0) != 0);
            newGreatestSyncedEventId = this.processedLog_.lowerKey(firstUnprocessedEventId);
        }
        this.processedLog_ = new TreeMap<Long, Long>(this.processedLog_.tailMap(newGreatestSyncedEventId));
        LOG.debug("Current count of processed events that are tracked: {}, greatest synced event id: {}", (Object)this.processedLog_.size(), (Object)newGreatestSyncedEventId);
    }

    synchronized long getGreatestSyncedEventId() {
        long greatestSyncedEventId = -1L;
        Map.Entry<Long, Long> entry = this.processedLog_.firstEntry();
        if (entry != null) {
            greatestSyncedEventId = entry.getKey();
        }
        return greatestSyncedEventId;
    }

    synchronized long getGreatestSyncedEventTime() {
        long greatestSyncedEventTime = 0L;
        Map.Entry<Long, Long> entry = this.processedLog_.firstEntry();
        if (entry != null) {
            greatestSyncedEventTime = entry.getValue();
        }
        return greatestSyncedEventTime;
    }

    public static enum EventExecutorStatus {
        INACTIVE,
        ACTIVE,
        STOPPED;

    }
}

