/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog.events;

import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetastoreClientInstantiationException;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.events.ConfigValidator;
import org.apache.impala.catalog.events.DeleteEventLog;
import org.apache.impala.catalog.events.EventFactory;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEventProcessorConfig;
import org.apache.impala.catalog.events.MetastoreEvents;
import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.events.MetastoreNotificationFetchException;
import org.apache.impala.catalog.events.MetastoreNotificationNeedsInvalidateException;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TEventBatchProgressInfo;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.NoOpEventSequence;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetastoreEventsProcessor
implements ExternalEventsProcessor {
    public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY = "hive.metastore.notifications.add.thrift.objects";
    private static final Logger LOG = LoggerFactory.getLogger(MetastoreEventsProcessor.class);
    private static final MessageDeserializer MESSAGE_DESERIALIZER = MetastoreShim.getMessageDeserializer();
    private static MetastoreEventsProcessor instance;
    private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;
    private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10;
    public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration";
    public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration";
    public static final String EVENTS_RECEIVED_METRIC = "events-received";
    public static final String EVENTS_SKIPPED_METRIC = "events-skipped";
    public static final String STATUS_METRIC = "status";
    public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
    public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time";
    public static final String LATEST_EVENT_ID = "latest-event-id";
    public static final String LATEST_EVENT_TIME = "latest-event-time";
    public static final String EVENT_PROCESSING_DELAY = "event-processing-delay";
    public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
    public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed";
    public static final String NUMBER_OF_TABLES_ADDED = "tables-added";
    public static final String NUMBER_OF_TABLES_REMOVED = "tables-removed";
    public static final String NUMBER_OF_DATABASES_ADDED = "databases-added";
    public static final String NUMBER_OF_DATABASES_REMOVED = "databases-removed";
    public static final String NUMBER_OF_PARTITIONS_ADDED = "partitions-added";
    public static final String NUMBER_OF_PARTITIONS_REMOVED = "partitions-removed";
    public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
    public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
    public static final String AVG_DELAY_IN_CONSUMING_EVENTS = "events-consuming-delay";
    private static final long SECOND_IN_NANOS = 1000000000L;
    private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED;
    private String eventProcessorErrorMsg_ = null;
    private final MetastoreEvents.MetastoreEventFactory metastoreEventFactory_;
    private NotificationEvent currentEvent_;
    private List<NotificationEvent> currentEventBatch_;
    private MetastoreEvents.MetastoreEvent currentFilteredEvent_;
    private List<MetastoreEvents.MetastoreEvent> currentFilteredEvents_;
    private long currentBatchStartTimeMs_ = 0L;
    private long currentEventStartTimeMs_ = 0L;
    private int currentEventIndex_ = 0;
    private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1L);
    private final AtomicLong lastSyncedEventTimeSecs_ = new AtomicLong(0L);
    private final AtomicLong latestEventId_ = new AtomicLong(0L);
    private final AtomicLong latestEventTimeSecs_ = new AtomicLong(0L);
    private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0L);
    private final long pollingFrequencyInSec_;
    protected final CatalogServiceCatalog catalog_;
    private final ScheduledExecutorService processEventsScheduler_ = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MetastoreEventsProcessor-ProcessEvents").build());
    private final ScheduledExecutorService updateEventIdScheduler_ = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MetastoreEventsProcessor-UpdateEventId").build());
    private final Metrics metrics_ = new Metrics();
    private final DeleteEventLog deleteEventLog_ = new DeleteEventLog();

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb(CatalogServiceCatalog catalog, long eventId, String dbName, String eventType) throws MetastoreNotificationException {
        return MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForDb(catalog, eventId, MetastoreShim.getDefaultCatalogName(), dbName, eventType);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb(CatalogServiceCatalog catalog, long eventId, String catName, String dbName, String eventType) throws MetastoreNotificationException {
        Preconditions.checkNotNull((Object)eventType, (Object)"eventType is null in fetching db events");
        Preconditions.checkNotNull((Object)catName, (Object)"catName is null in fetching db events");
        Preconditions.checkNotNull((Object)dbName, (Object)"dbName is null in fetching db events");
        IMetaStoreClient.NotificationFilter filter = notificationEvent -> eventType.equals(notificationEvent.getEventType()) && catName.equalsIgnoreCase(notificationEvent.getCatName()) && dbName.equalsIgnoreCase(notificationEvent.getDbName());
        MetaDataFilter metaDataFilter = new MetaDataFilter(filter, catName, dbName);
        return MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(catalog, eventId, metaDataFilter, 1000, eventType);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForTable(CatalogServiceCatalog catalog, long eventId, String dbName, String tblName, String eventType) throws MetastoreNotificationException {
        return MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable(catalog, eventId, MetastoreShim.getDefaultCatalogName(), dbName, tblName, eventType);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForTable(CatalogServiceCatalog catalog, long eventId, String catName, String dbName, String tblName, String eventType) throws MetastoreNotificationException {
        Preconditions.checkNotNull((Object)eventType, (Object)"eventType is null in fetching table events");
        Preconditions.checkNotNull((Object)catName, (Object)"catName is null in fetching table events");
        Preconditions.checkNotNull((Object)dbName, (Object)"dbName is null in fetching table events");
        Preconditions.checkNotNull((Object)tblName, (Object)"tblName is null in fetching table events");
        IMetaStoreClient.NotificationFilter filter = notificationEvent -> eventType.equals(notificationEvent.getEventType()) && catName.equalsIgnoreCase(notificationEvent.getCatName()) && dbName.equalsIgnoreCase(notificationEvent.getDbName()) && tblName.equalsIgnoreCase(notificationEvent.getTableName());
        MetaDataFilter metaDataFilter = new MetaDataFilter(filter, catName, dbName, tblName);
        return MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(catalog, eventId, metaDataFilter, 1000, eventType);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatches(CatalogServiceCatalog catalog, long eventId, IMetaStoreClient.NotificationFilter filter, String ... eventTypes) throws MetastoreNotificationFetchException {
        return MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog, eventId, filter, 1000, eventTypes);
    }

    @VisibleForTesting
    public static List<NotificationEvent> getNextMetastoreEventsInBatches(CatalogServiceCatalog catalog, long eventId, IMetaStoreClient.NotificationFilter filter, int eventsBatchSize, String ... eventTypes) throws MetastoreNotificationFetchException {
        MetaDataFilter metaDataFilter = new MetaDataFilter(filter);
        return MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(catalog, eventId, metaDataFilter, eventsBatchSize, eventTypes);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public static List<NotificationEvent> getNextMetastoreEventsWithFilterInBatches(CatalogServiceCatalog catalog, long eventId, MetaDataFilter metaDataFilter, int eventsBatchSize, String ... eventTypes) throws MetastoreNotificationFetchException {
        Preconditions.checkArgument((eventsBatchSize > 0 ? 1 : 0) != 0);
        ArrayList<NotificationEvent> result = new ArrayList<NotificationEvent>();
        IMetaStoreClient.NotificationFilter filter = metaDataFilter.getNotificationFilter();
        try (MetaStoreClientPool.MetaStoreClient msc = catalog.getMetaStoreClient();){
            long toEventId = msc.getHiveClient().getCurrentNotificationEventId().getEventId();
            if (toEventId <= eventId) {
                ArrayList<NotificationEvent> arrayList = result;
                return arrayList;
            }
            long currentEventId = eventId;
            ArrayList eventTypeSkipList = catalog.getDefaultSkippedHmsEventTypes();
            String typeStr = null;
            if (eventTypes != null && eventTypes.length > 0) {
                eventTypeSkipList = Lists.newArrayList(catalog.getCommonHmsEventTypes());
                eventTypeSkipList.removeIf(s -> Arrays.asList(eventTypes).contains(s));
                typeStr = String.join((CharSequence)",", eventTypes) + " ";
            }
            LOG.info("Fetching {}events started from id {} to {}. Gap: {}", new Object[]{typeStr == null ? "" : typeStr, eventId, toEventId, toEventId - eventId});
            int numFilteredEvents = 0;
            block14: while (true) {
                NotificationEventResponse notificationEventResponse;
                block28: {
                    block27: {
                        if (currentEventId >= toEventId) break block27;
                        int batchSize = Math.min(eventsBatchSize, (int)(toEventId - currentEventId));
                        NotificationEventRequest eventRequest = new NotificationEventRequest();
                        eventRequest.setMaxEvents(batchSize);
                        eventRequest.setLastEvent(currentEventId);
                        MetastoreShim.setNotificationEventRequestWithFilter(eventRequest, metaDataFilter);
                        notificationEventResponse = MetastoreShim.getNextNotification(msc.getHiveClient(), eventRequest, eventTypeSkipList);
                        if (!notificationEventResponse.getEvents().isEmpty()) break block28;
                    }
                    if (numFilteredEvents > 0) {
                        LOG.info("Got {} events and filtered out {} locally from {} events start from id {}", new Object[]{result.size(), numFilteredEvents, toEventId - eventId, eventId + 1L});
                    }
                    ArrayList<NotificationEvent> arrayList = result;
                    return arrayList;
                }
                Iterator iterator = notificationEventResponse.getEvents().iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block14;
                    NotificationEvent event = (NotificationEvent)iterator.next();
                    if (filter == null || filter.accept(event)) {
                        result.add(event);
                    } else {
                        ++numFilteredEvents;
                    }
                    currentEventId = event.getEventId();
                }
                break;
            }
        }
        catch (MetastoreClientInstantiationException | TException e) {
            throw new MetastoreNotificationFetchException(String.format("Error making '%s' RPC to Hive Metastore: ", "getNextNotification"), e);
        }
    }

    public static void syncToLatestEventId(CatalogServiceCatalog catalog, Table tbl, EventFactory eventFactory, Metrics metrics) throws CatalogException, MetastoreNotificationException {
        Preconditions.checkArgument((tbl != null ? 1 : 0) != 0, (Object)"tbl is null");
        Preconditions.checkState((!(tbl instanceof IncompleteTable) && tbl.isLoaded() ? 1 : 0) != 0, (String)"table %s is either incomplete or not loaded", (Object)tbl.getFullName());
        Preconditions.checkState((boolean)tbl.isWriteLockedByCurrentThread(), (Object)String.format("Write lock is not held on table %s by current thread", tbl.getFullName()));
        long lastEventId = tbl.getLastSyncedEventId();
        Preconditions.checkArgument((lastEventId > 0L ? 1 : 0) != 0, (String)"lastEvent  Id %s for table %s should be greater than 0", (long)lastEventId, (Object)tbl.getFullName());
        String annotation = String.format("sync table %s to latest HMS event id", tbl.getFullName());
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);){
            MetaDataFilter metaDataFilter = AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters()) ? new MetaDataFilter(MetastoreEventsProcessor.getTableNotificationEventFilter(tbl)) : new MetaDataFilter(MetastoreEventsProcessor.getTableNotificationEventFilter(tbl), MetastoreShim.getDefaultCatalogName(), tbl.getDb().getName(), tbl.getName());
            List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(catalog, lastEventId, metaDataFilter, 1000, new String[0]);
            if (events.isEmpty()) {
                LOG.debug("table {} synced till event id {}. No new HMS events to process from event id: {}", new Object[]{tbl.getFullName(), lastEventId, lastEventId + 1L});
                return;
            }
            MetastoreEvents.MetastoreEvent currentEvent = null;
            for (NotificationEvent event : events) {
                currentEvent = eventFactory.get(event, metrics);
                LOG.trace("for table {}, processing event {}", (Object)tbl.getFullName(), (Object)currentEvent);
                currentEvent.processIfEnabled();
                if (currentEvent.isDropEvent()) {
                    Preconditions.checkNotNull((Object)currentEvent.getDbName());
                    Preconditions.checkNotNull((Object)currentEvent.getTableName());
                    String key = DeleteEventLog.getTblKey(currentEvent.getDbName(), currentEvent.getTableName());
                    catalog.getMetastoreEventProcessor().getDeleteEventLog().addRemovedObject(currentEvent.getEventId(), key);
                }
                if (!(currentEvent instanceof MetastoreEvents.DropTableEvent)) continue;
                return;
            }
            if (currentEvent.getEventId() > tbl.getLastSyncedEventId()) {
                tbl.setLastSyncedEventId(currentEvent.getEventId());
            }
            LOG.info("Synced table {} till HMS event:  {}", (Object)tbl.getFullName(), (Object)tbl.getLastSyncedEventId());
        }
    }

    public static void syncToLatestEventId(CatalogServiceCatalog catalog, Db db, EventFactory eventFactory, Metrics metrics) throws CatalogException, MetastoreNotificationException {
        Preconditions.checkArgument((db != null ? 1 : 0) != 0, (Object)"db is null");
        long lastEventId = db.getLastSyncedEventId();
        Preconditions.checkArgument((lastEventId > 0L ? 1 : 0) != 0, (String)"Invalid last synced event ID %s for db %s ", (long)lastEventId, (Object)db.getName());
        Preconditions.checkState((boolean)db.isLockHeldByCurrentThread(), (String)"Current thread does not hold lock on db: %s", (Object)db.getName());
        String annotation = String.format("sync db %s to latest HMS event id", db.getName());
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);){
            MetaDataFilter metaDataFilter = new MetaDataFilter(MetastoreEventsProcessor.getDbNotificationEventFilter(db), MetastoreShim.getDefaultCatalogName(), db.getName());
            List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(catalog, lastEventId, metaDataFilter, 1000, new String[0]);
            if (events.isEmpty()) {
                LOG.debug("db {} already synced till event id: {}, no new hms events from event id: {}", new Object[]{db.getName(), lastEventId, lastEventId + 1L});
                return;
            }
            MetastoreEvents.MetastoreEvent currentEvent = null;
            for (NotificationEvent event : events) {
                currentEvent = eventFactory.get(event, metrics);
                LOG.trace("for db {}, processing event: {}", (Object)db.getName(), (Object)currentEvent);
                currentEvent.processIfEnabled();
                if (!currentEvent.isDropEvent()) continue;
                Preconditions.checkState((boolean)(currentEvent instanceof MetastoreEvents.DropDatabaseEvent), (String)"invalid drop event {} ", (Object)currentEvent);
                Preconditions.checkNotNull((Object)currentEvent.getDbName());
                String key = DeleteEventLog.getDbKey(currentEvent.getDbName());
                catalog.getMetastoreEventProcessor().getDeleteEventLog().addRemovedObject(currentEvent.getEventId(), key);
                return;
            }
            db.setLastSyncedEventId(currentEvent.getEventId());
            LOG.info("Synced db {} till HMS event {}", (Object)db.getName(), (Object)currentEvent);
        }
    }

    private static IMetaStoreClient.NotificationFilter getTableNotificationEventFilter(final Table tbl) {
        IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter(){

            public boolean accept(NotificationEvent event) {
                if (event.getDbName() != null && event.getTableName() != null) {
                    return tbl.getDb().getName().equalsIgnoreCase(event.getDbName()) && tbl.getName().equalsIgnoreCase(event.getTableName());
                }
                return event.getDbName() == null;
            }
        };
        return filter;
    }

    @VisibleForTesting
    public static IMetaStoreClient.NotificationFilter getDbNotificationEventFilter(final Db db) {
        IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter(){

            public boolean accept(NotificationEvent event) {
                if (event.getDbName() != null && event.getTableName() == null) {
                    return db.getName().equalsIgnoreCase(event.getDbName());
                }
                return event.getTableName() == null;
            }
        };
        return filter;
    }

    @VisibleForTesting
    MetastoreEventsProcessor(CatalogOpExecutor catalogOpExecutor, long startSyncFromId, long pollingFrequencyInSec) throws CatalogException {
        Preconditions.checkState((pollingFrequencyInSec > 0L ? 1 : 0) != 0);
        this.catalog_ = (CatalogServiceCatalog)Preconditions.checkNotNull((Object)catalogOpExecutor.getCatalog());
        this.validateConfigs();
        this.lastSyncedEventId_.set(startSyncFromId);
        this.lastSyncedEventTimeSecs_.set(this.getEventTimeFromHMS(startSyncFromId));
        this.initMetrics();
        this.metastoreEventFactory_ = new MetastoreEvents.MetastoreEventFactory(catalogOpExecutor);
        this.pollingFrequencyInSec_ = pollingFrequencyInSec;
    }

    @VisibleForTesting
    void validateConfigs() throws CatalogException {
        ArrayList<ConfigValidator.ValidationResult> validationErrors = new ArrayList<ConfigValidator.ValidationResult>();
        for (MetastoreEventProcessorConfig config : MetastoreEventsProcessor.getEventProcessorConfigsToValidate()) {
            String configKey = config.getValidator().getConfigKey();
            try {
                String value = this.getConfigValueFromMetastore(configKey, "");
                ConfigValidator.ValidationResult result = config.validate(value);
                if (result.isValid()) continue;
                validationErrors.add(result);
            }
            catch (TException e) {
                String msg = String.format("Unable to get configuration %s from metastore. Check if metastore is accessible", configKey);
                LOG.error(msg, (Throwable)e);
                throw new CatalogException(msg);
            }
        }
        if (!validationErrors.isEmpty()) {
            LOG.error("Found {} incorrect metastore configuration(s).", (Object)validationErrors.size());
            for (ConfigValidator.ValidationResult invalidConfig : validationErrors) {
                LOG.error(invalidConfig.getReason());
            }
            throw new CatalogException(String.format("Found %d incorrect metastore configuration(s). Events processor cannot start. See ERROR log for more details.", validationErrors.size()));
        }
    }

    @Override
    public DeleteEventLog getDeleteEventLog() {
        return this.deleteEventLog_;
    }

    public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() {
        return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME);
    }

    private void initMetrics() {
        this.metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC);
        this.metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC);
        this.metrics_.addMeter(EVENTS_RECEIVED_METRIC);
        this.metrics_.addCounter(EVENTS_SKIPPED_METRIC);
        this.metrics_.addGauge(STATUS_METRIC, () -> this.getStatus().toString());
        this.metrics_.addGauge(LAST_SYNCED_ID_METRIC, this.lastSyncedEventId_::get);
        this.metrics_.addGauge(LAST_SYNCED_EVENT_TIME, this.lastSyncedEventTimeSecs_::get);
        this.metrics_.addGauge(LATEST_EVENT_ID, this.latestEventId_::get);
        this.metrics_.addGauge(LATEST_EVENT_TIME, this.latestEventTimeSecs_::get);
        this.metrics_.addGauge(EVENT_PROCESSING_DELAY, () -> this.latestEventTimeSecs_.get() - this.lastSyncedEventTimeSecs_.get());
        this.metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
        this.metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
        this.metrics_.addCounter(NUMBER_OF_TABLES_ADDED);
        this.metrics_.addCounter(NUMBER_OF_TABLES_REMOVED);
        this.metrics_.addCounter(NUMBER_OF_DATABASES_ADDED);
        this.metrics_.addCounter(NUMBER_OF_DATABASES_REMOVED);
        this.metrics_.addCounter(NUMBER_OF_PARTITIONS_ADDED);
        this.metrics_.addCounter(NUMBER_OF_PARTITIONS_REMOVED);
        this.metrics_.addGauge(DELETE_EVENT_LOG_SIZE, this.deleteEventLog_::size);
        this.metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
        this.metrics_.addTimer(AVG_DELAY_IN_CONSUMING_EVENTS);
    }

    @Override
    public synchronized void start() {
        Preconditions.checkState((this.eventProcessorStatus_ != EventProcessorStatus.ACTIVE ? 1 : 0) != 0);
        this.resetProgressInfo();
        this.startScheduler();
        this.updateStatus(EventProcessorStatus.ACTIVE);
        LOG.info(String.format("Successfully started metastore event processing. Polling interval: %d seconds.", this.pollingFrequencyInSec_));
    }

    public EventProcessorStatus getStatus() {
        return this.eventProcessorStatus_;
    }

    @VisibleForTesting
    public String getConfigValueFromMetastore(String config, String defaultVal) throws TException {
        try (MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();){
            IMetaStoreClient iMetaStoreClient = metaStoreClient.getHiveClient();
            String string = MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal);
            return string;
        }
    }

    @VisibleForTesting
    public long getLastSyncedEventId() {
        return this.lastSyncedEventId_.get();
    }

    @VisibleForTesting
    public long getLatestEventId() {
        return this.latestEventId_.get();
    }

    @VisibleForTesting
    void startScheduler() {
        Preconditions.checkState((this.pollingFrequencyInSec_ > 0L ? 1 : 0) != 0);
        LOG.info(String.format("Starting metastore event polling with interval %d seconds.", this.pollingFrequencyInSec_));
        this.processEventsScheduler_.scheduleAtFixedRate(this::processEvents, this.pollingFrequencyInSec_, this.pollingFrequencyInSec_, TimeUnit.SECONDS);
        this.updateEventIdScheduler_.scheduleAtFixedRate(this::updateLatestEventId, this.pollingFrequencyInSec_, this.pollingFrequencyInSec_, TimeUnit.SECONDS);
    }

    @Override
    public synchronized void pause() {
        if (this.eventProcessorStatus_ == EventProcessorStatus.PAUSED) {
            return;
        }
        this.updateStatus(EventProcessorStatus.PAUSED);
        LOG.info(String.format("Event processing is paused. Last synced event id is %d", this.lastSyncedEventId_.get()));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public long getCurrentEventId() throws MetastoreNotificationFetchException {
        try (MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();){
            long l = metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
            return l;
        }
        catch (MetastoreClientInstantiationException | TException e) {
            throw new MetastoreNotificationFetchException("Unable to fetch the current notification event id. Check if metastore service is accessible", e);
        }
    }

    public static long getCurrentEventIdNoThrow(IMetaStoreClient client) {
        long latestEventId = -1L;
        try {
            latestEventId = client.getCurrentNotificationEventId().getEventId();
        }
        catch (TException exception) {
            LOG.warn(String.format("Unable to fetch latest event id from HMS: %s", exception.getMessage()));
        }
        return latestEventId;
    }

    private NotificationEvent getEventFromHMS(MetaStoreClientPool.MetaStoreClient msClient, long eventId) {
        NotificationEventRequest eventRequest = new NotificationEventRequest();
        eventRequest.setLastEvent(eventId - 1L);
        eventRequest.setMaxEvents(1);
        try {
            NotificationEventResponse response = MetastoreShim.getNextNotification(msClient.getHiveClient(), eventRequest, null);
            Iterator eventIter = response.getEventsIterator();
            if (!eventIter.hasNext()) {
                LOG.warn("Unable to fetch event {}. It has been cleaned up", (Object)eventId);
                return null;
            }
            return (NotificationEvent)eventIter.next();
        }
        catch (TException e) {
            LOG.warn("Unable to fetch event {}", (Object)eventId, (Object)e);
            return null;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public int getEventTimeFromHMS(long eventId) {
        try (MetaStoreClientPool.MetaStoreClient msClient = this.catalog_.getMetaStoreClient();){
            NotificationEvent event = this.getEventFromHMS(msClient, eventId);
            if (event == null) return 0;
            int n = event.getEventTime();
            return n;
        }
        catch (MetastoreClientInstantiationException e) {
            LOG.error("Failed to get event time from HMS for event {}", (Object)eventId, (Object)e);
        }
        return 0;
    }

    @Override
    public synchronized void start(long fromEventId) {
        Preconditions.checkArgument((fromEventId >= 0L ? 1 : 0) != 0);
        EventProcessorStatus currentStatus = this.eventProcessorStatus_;
        long prevLastSyncedEventId = this.lastSyncedEventId_.get();
        if (currentStatus == EventProcessorStatus.ACTIVE && prevLastSyncedEventId >= fromEventId) {
            return;
        }
        this.eventProcessorErrorMsg_ = null;
        this.resetProgressInfo();
        this.lastSyncedEventId_.set(fromEventId);
        this.lastSyncedEventTimeSecs_.set(this.getEventTimeFromHMS(fromEventId));
        this.updateStatus(EventProcessorStatus.ACTIVE);
        LOG.info(String.format("Metastore event processing restarted. Last synced event id was updated from %d to %d", prevLastSyncedEventId, this.lastSyncedEventId_.get()));
    }

    @Override
    public synchronized void shutdown() {
        Preconditions.checkState((this.eventProcessorStatus_ != EventProcessorStatus.STOPPED ? 1 : 0) != 0, (Object)"Event processing is already stopped");
        this.shutdownAndAwaitTermination(this.processEventsScheduler_);
        this.shutdownAndAwaitTermination(this.updateEventIdScheduler_);
        this.updateStatus(EventProcessorStatus.STOPPED);
        LOG.info("Metastore event processing stopped.");
    }

    private synchronized void shutdownAndAwaitTermination(ScheduledExecutorService ses) {
        Preconditions.checkNotNull((Object)ses);
        ses.shutdown();
        try {
            if (!ses.awaitTermination(10L, TimeUnit.SECONDS)) {
                LOG.info(String.format("Scheduler pool did not terminate within %d seconds. Attempting to stop currently running tasks", 10));
                ses.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            LOG.info("Received interruptedException. Terminating currently running tasks.", (Throwable)e);
            ses.shutdownNow();
        }
    }

    /*
     * Exception decompiling
     */
    public List<NotificationEvent> getNextMetastoreEvents(long eventId, long currentEventId, boolean getAllEvents, @Nullable IMetaStoreClient.NotificationFilter filter) throws MetastoreNotificationFetchException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @VisibleForTesting
    protected List<NotificationEvent> getNextMetastoreEvents() throws MetastoreNotificationFetchException {
        return this.getNextMetastoreEvents(this.getCurrentEventId());
    }

    @VisibleForTesting
    public List<NotificationEvent> getNextMetastoreEvents(long currentEventId) throws MetastoreNotificationFetchException {
        return this.getNextMetastoreEvents(this.lastSyncedEventId_.get(), currentEventId, false, null);
    }

    @Override
    public void processEvents() {
        try {
            EventProcessorStatus currentStatus = this.eventProcessorStatus_;
            if (currentStatus != EventProcessorStatus.ACTIVE) {
                LOG.warn(String.format("Event processing is skipped since status is %s. Last synced event id is %d", new Object[]{currentStatus, this.lastSyncedEventId_.get()}));
                this.tryAutoGlobalInvalidateOnFailure();
                return;
            }
            long currentEventId = this.getCurrentEventId();
            List<NotificationEvent> events = this.getNextMetastoreEvents(currentEventId);
            this.processEvents(currentEventId, events);
        }
        catch (MetastoreNotificationFetchException ex) {
            LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore may be unavailable. Will retry.", (Throwable)ex);
        }
        catch (MetastoreNotificationNeedsInvalidateException ex) {
            this.updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
            String msg = "Event processing needs a invalidate command to resolve the state";
            LOG.error(msg, (Throwable)ex);
            this.eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' + ExceptionUtils.getFullStackTrace((Throwable)ex);
            this.tryAutoGlobalInvalidateOnFailure();
        }
        catch (Exception ex) {
            this.updateStatus(EventProcessorStatus.ERROR);
            String msg = "Unexpected exception received while processing event";
            LOG.error(msg, (Throwable)ex);
            this.eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' + ExceptionUtils.getFullStackTrace((Throwable)ex);
            this.dumpEventInfoToLog(this.currentEvent_);
            this.tryAutoGlobalInvalidateOnFailure();
        }
    }

    private void tryAutoGlobalInvalidateOnFailure() {
        EventProcessorStatus currentStatus = this.eventProcessorStatus_;
        if (BackendConfig.INSTANCE.isInvalidateGlobalMetadataOnEventProcessFailureEnabled() && (currentStatus == EventProcessorStatus.ERROR || currentStatus == EventProcessorStatus.NEEDS_INVALIDATE)) {
            try {
                LOG.error("Triggering auto global invalidation");
                this.catalog_.reset(NoOpEventSequence.INSTANCE);
                this.eventProcessorErrorMsg_ = null;
            }
            catch (Exception e) {
                LOG.error("Automatic global invalidate metadata failed", (Throwable)e);
            }
        }
    }

    @VisibleForTesting
    public void updateLatestEventId() {
        EventProcessorStatus currentStatus = this.eventProcessorStatus_;
        if (currentStatus == EventProcessorStatus.DISABLED) {
            return;
        }
        try (MetaStoreClientPool.MetaStoreClient msClient = this.catalog_.getMetaStoreClient();){
            CurrentNotificationEventId currentNotificationEventId = msClient.getHiveClient().getCurrentNotificationEventId();
            long currentEventId = currentNotificationEventId.getEventId();
            if (currentEventId <= this.latestEventId_.get()) {
                return;
            }
            NotificationEvent event = this.getEventFromHMS(msClient, currentEventId);
            if (event == null) {
                return;
            }
            long lastSyncedEventId = this.lastSyncedEventId_.get();
            long lastSyncedEventTime = this.lastSyncedEventTimeSecs_.get();
            long currentEventTime = event.getEventTime();
            this.latestEventId_.set(currentEventId);
            this.latestEventTimeSecs_.set(currentEventTime);
            LOG.info("Latest event in HMS: id={}, time={}. Last synced event: id={}, time={}.", new Object[]{currentEventId, currentEventTime, lastSyncedEventId, lastSyncedEventTime});
            if (currentEventTime > lastSyncedEventTime) {
                LOG.warn("Lag: {}. {} events pending to be processed.", (Object)PrintUtils.printTimeMs((currentEventTime - lastSyncedEventTime) * 1000L), (Object)(currentEventId - lastSyncedEventId));
            }
        }
        catch (Exception e) {
            LOG.error("Unable to update current notification event id. Last value: {}", (Object)this.latestEventId_, (Object)e);
        }
    }

    @Override
    public TEventProcessorMetrics getEventProcessorMetrics() {
        TEventProcessorMetrics eventProcessorMetrics = new TEventProcessorMetrics();
        EventProcessorStatus currentStatus = this.getStatus();
        eventProcessorMetrics.setStatus(currentStatus.toString());
        eventProcessorMetrics.setLast_synced_event_id(this.getLastSyncedEventId());
        if (currentStatus != EventProcessorStatus.ACTIVE) {
            return eventProcessorMetrics;
        }
        eventProcessorMetrics.setLast_synced_event_time(this.lastSyncedEventTimeSecs_.get());
        eventProcessorMetrics.setLatest_event_id(this.latestEventId_.get());
        eventProcessorMetrics.setLatest_event_time(this.latestEventTimeSecs_.get());
        long eventsReceived = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
        long eventsSkipped = this.metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
        eventProcessorMetrics.setEvents_received(eventsReceived);
        eventProcessorMetrics.setEvents_skipped(eventsSkipped);
        Snapshot fetchDuration = this.metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getSnapshot();
        double avgFetchDuration = fetchDuration.getMean() / 1.0E9;
        double p75FetchDuration = fetchDuration.get75thPercentile() / 1.0E9;
        double p95FetchDuration = fetchDuration.get95thPercentile() / 1.0E9;
        double p99FetchDuration = fetchDuration.get99thPercentile() / 1.0E9;
        eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration);
        eventProcessorMetrics.setEvents_fetch_duration_p75(p75FetchDuration);
        eventProcessorMetrics.setEvents_fetch_duration_p95(p95FetchDuration);
        eventProcessorMetrics.setEvents_fetch_duration_p99(p99FetchDuration);
        Snapshot processDuration = this.metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getSnapshot();
        double avgProcessDuration = processDuration.getMean() / 1.0E9;
        double p75ProcessDuration = processDuration.get75thPercentile() / 1.0E9;
        double p95ProcessDuration = processDuration.get95thPercentile() / 1.0E9;
        double p99ProcessDuration = processDuration.get99thPercentile() / 1.0E9;
        eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration);
        eventProcessorMetrics.setEvents_process_duration_p75(p75ProcessDuration);
        eventProcessorMetrics.setEvents_process_duration_p95(p95ProcessDuration);
        eventProcessorMetrics.setEvents_process_duration_p99(p99ProcessDuration);
        double lastProcessDuration = (double)this.lastEventProcessDurationNs_.get() / 1.0E9;
        eventProcessorMetrics.setLast_events_process_duration(lastProcessDuration);
        double avgNumberOfEventsReceived1Min = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate();
        double avgNumberOfEventsReceived5Min = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate();
        double avgNumberOfEventsReceived15Min = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate();
        eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min);
        eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min);
        eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min);
        LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process duration: {} Events received rate (1min) : {}", new Object[]{eventsReceived, eventsSkipped, avgFetchDuration, avgProcessDuration, avgNumberOfEventsReceived1Min});
        return eventProcessorMetrics;
    }

    @Override
    public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
        TEventProcessorMetricsSummaryResponse summaryResponse = new TEventProcessorMetricsSummaryResponse();
        summaryResponse.setSummary(this.metrics_.toString());
        if (this.eventProcessorErrorMsg_ != null) {
            summaryResponse.setError_msg(this.eventProcessorErrorMsg_);
        }
        TEventBatchProgressInfo progressInfo = new TEventBatchProgressInfo();
        progressInfo.last_synced_event_id = this.lastSyncedEventId_.get();
        progressInfo.last_synced_event_time_s = this.lastSyncedEventTimeSecs_.get();
        progressInfo.latest_event_id = this.latestEventId_.get();
        progressInfo.latest_event_time_s = this.latestEventTimeSecs_.get();
        List<NotificationEvent> eventBatch = this.currentEventBatch_;
        List<MetastoreEvents.MetastoreEvent> filteredEvents = this.currentFilteredEvents_;
        if (eventBatch != null && !eventBatch.isEmpty()) {
            int numHmsEvents;
            progressInfo.num_hms_events = numHmsEvents = eventBatch.size();
            progressInfo.min_event_id = eventBatch.get(0).getEventId();
            progressInfo.min_event_time_s = eventBatch.get(0).getEventTime();
            NotificationEvent lastEvent = eventBatch.get(numHmsEvents - 1);
            progressInfo.max_event_id = lastEvent.getEventId();
            progressInfo.max_event_time_s = lastEvent.getEventTime();
            progressInfo.current_batch_start_time_ms = this.currentBatchStartTimeMs_;
            progressInfo.current_event_start_time_ms = this.currentEventStartTimeMs_;
            if (filteredEvents != null) {
                progressInfo.num_filtered_events = filteredEvents.size();
            }
            progressInfo.current_event_index = this.currentEventIndex_;
            progressInfo.current_event_batch_size = this.currentFilteredEvent_ != null ? this.currentFilteredEvent_.getNumberOfEvents() : 0;
            progressInfo.current_event = this.currentEvent_;
        }
        summaryResponse.setProgress(progressInfo);
        return summaryResponse;
    }

    @VisibleForTesting
    Metrics getMetrics() {
        return this.metrics_;
    }

    private void resetProgressInfo() {
        this.currentEvent_ = null;
        this.currentEventBatch_ = null;
        this.currentFilteredEvent_ = null;
        this.currentFilteredEvents_ = null;
        this.currentBatchStartTimeMs_ = 0L;
        this.currentEventStartTimeMs_ = 0L;
        this.currentEventIndex_ = 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void processEvents(long currentEventId, List<NotificationEvent> events) throws MetastoreNotificationException {
        this.currentEventBatch_ = events;
        this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark((long)events.size());
        if (events.isEmpty()) {
            if (this.lastSyncedEventId_.get() < currentEventId) {
                this.lastSyncedEventId_.set(currentEventId);
                this.lastSyncedEventTimeSecs_.set(this.getEventTimeFromHMS(currentEventId));
            }
            return;
        }
        Timer.Context context = this.metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
        this.currentBatchStartTimeMs_ = System.currentTimeMillis();
        HashMap<MetastoreEvents.MetastoreEvent, Long> eventProcessingTime = new HashMap<MetastoreEvents.MetastoreEvent, Long>();
        try {
            this.currentFilteredEvents_ = this.metastoreEventFactory_.getFilteredEvents(events, this.metrics_);
            if (this.currentFilteredEvents_.isEmpty()) {
                NotificationEvent e = events.get(events.size() - 1);
                this.lastSyncedEventId_.set(e.getEventId());
                this.lastSyncedEventTimeSecs_.set(e.getEventTime());
                this.resetProgressInfo();
                return;
            }
            for (MetastoreEvents.MetastoreEvent event : this.currentFilteredEvents_) {
                MetastoreEventsProcessor metastoreEventsProcessor = this;
                synchronized (metastoreEventsProcessor) {
                    if (this.eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
                        break;
                    }
                    this.currentEvent_ = event.metastoreNotificationEvent_;
                    this.currentFilteredEvent_ = event;
                    String targetName = event.getTargetName();
                    String desc = String.format("Processing %s on %s, eventId=%d", new Object[]{event.getEventType(), targetName, event.getEventId()});
                    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc);){
                        this.currentEventStartTimeMs_ = System.currentTimeMillis();
                        event.processIfEnabled();
                        long elapsedTimeMs = System.currentTimeMillis() - this.currentEventStartTimeMs_;
                        eventProcessingTime.put(event, elapsedTimeMs);
                    }
                    catch (Exception processingEx) {
                        try {
                            if (!event.onFailure(processingEx)) {
                                event.errorLog("Unable to handle event processing failure", new Object[0]);
                                throw processingEx;
                            }
                        }
                        catch (Exception onFailureEx) {
                            event.errorLog("Failed to handle event processing failure", onFailureEx);
                            throw processingEx;
                        }
                    }
                    ++this.currentEventIndex_;
                    this.deleteEventLog_.garbageCollect(event.getEventId());
                    this.lastSyncedEventId_.set(event.getEventId());
                    this.lastSyncedEventTimeSecs_.set(event.getEventTime());
                    this.metrics_.getTimer(AVG_DELAY_IN_CONSUMING_EVENTS).update(System.currentTimeMillis() / 1000L - event.getEventTime(), TimeUnit.SECONDS);
                }
            }
            this.resetProgressInfo();
        }
        catch (CatalogException e) {
            throw new MetastoreNotificationException(String.format("Unable to process event %d of type %s. Event processing will be stopped.", this.currentEvent_.getEventId(), this.currentEvent_.getEventType()), e);
        }
        finally {
            long elapsedNs = context.stop();
            this.lastEventProcessDurationNs_.set(elapsedNs);
            this.logEventMetrics(eventProcessingTime, elapsedNs);
        }
    }

    private void logEventMetrics(Map<MetastoreEvents.MetastoreEvent, Long> eventProcessingTime, long elapsedNs) {
        LOG.info("Time elapsed in processing event batch: {}", (Object)PrintUtils.printTimeNs(elapsedNs));
        if (elapsedNs < 5000000000L) {
            return;
        }
        ArrayList<Map.Entry<MetastoreEvents.MetastoreEvent, Long>> eventList = new ArrayList<Map.Entry<MetastoreEvents.MetastoreEvent, Long>>(eventProcessingTime.entrySet());
        eventList.sort(Map.Entry.comparingByValue().reversed());
        int num = Math.min(10, eventList.size());
        StringBuilder report = new StringBuilder("Top " + num + " expensive events: ");
        for (Object entry : eventList.subList(0, num)) {
            MetastoreEvents.MetastoreEvent event = (MetastoreEvents.MetastoreEvent)entry.getKey();
            long durationMs = (Long)entry.getValue();
            report.append(String.format("(type=%s, id=%s, target=%s, duration_ms=%d) ", new Object[]{event.getEventType(), event.getEventId(), event.getTargetName(), durationMs}));
        }
        HashMap<String, Long> durationPerTable = new HashMap<String, Long>();
        for (MetastoreEvents.MetastoreEvent event : eventProcessingTime.keySet()) {
            String targetName = event.getTargetName();
            long durationMs = durationPerTable.getOrDefault(targetName, 0L) + eventProcessingTime.get(event);
            durationPerTable.put(targetName, durationMs);
        }
        ArrayList targetList = new ArrayList(durationPerTable.entrySet());
        targetList.sort(Map.Entry.comparingByValue().reversed());
        num = Math.min(10, targetList.size());
        report.append("\nTop ").append(num).append(" targets in event processing: ");
        for (Map.Entry entry : targetList.subList(0, num)) {
            String targetName = (String)entry.getKey();
            long durationMs = (Long)entry.getValue();
            report.append(String.format("(target=%s, duration_ms=%d) ", targetName, durationMs));
        }
        LOG.warn(report.toString());
    }

    @VisibleForTesting
    public synchronized void updateStatus(EventProcessorStatus toStatus) {
        this.eventProcessorStatus_ = toStatus;
    }

    private void dumpEventInfoToLog(NotificationEvent event) {
        if (event == null) {
            String error = "Notification event is null";
            LOG.error(error);
            this.eventProcessorErrorMsg_ = this.eventProcessorErrorMsg_ + '\n' + error;
            return;
        }
        StringBuilder msg = new StringBuilder().append("Event id: ").append(event.getEventId()).append("\n").append("Event Type: ").append(event.getEventType()).append("\n").append("Event time: ").append(event.getEventTime()).append("\n").append("Database name: ").append(event.getDbName()).append("\n");
        if (event.getTableName() != null) {
            msg.append("Table name: ").append(event.getTableName()).append("\n");
        }
        msg.append("Event message: ").append(event.getMessage()).append("\n");
        String msgStr = msg.toString();
        LOG.error(msgStr);
        this.eventProcessorErrorMsg_ = this.eventProcessorErrorMsg_ + '\n' + msgStr;
    }

    public static synchronized ExternalEventsProcessor getInstance(CatalogOpExecutor catalogOpExecutor, long startSyncFromId, long eventPollingInterval) throws CatalogException {
        if (instance != null) {
            return instance;
        }
        instance = new MetastoreEventsProcessor(catalogOpExecutor, startSyncFromId, eventPollingInterval);
        return instance;
    }

    @Override
    public MetastoreEvents.MetastoreEventFactory getEventsFactory() {
        return this.metastoreEventFactory_;
    }

    public static MessageDeserializer getMessageDeserializer() {
        return MESSAGE_DESERIALIZER;
    }

    public static class MetaDataFilter {
        public IMetaStoreClient.NotificationFilter filter_;
        public String catName_;
        public String dbName_;
        public String tableName_;

        public MetaDataFilter(IMetaStoreClient.NotificationFilter notificationFilter) {
            this.filter_ = notificationFilter;
        }

        public MetaDataFilter(IMetaStoreClient.NotificationFilter notificationFilter, String catName, String dbName) {
            this(notificationFilter);
            this.catName_ = (String)Preconditions.checkNotNull((Object)catName);
            this.dbName_ = (String)Preconditions.checkNotNull((Object)dbName);
        }

        public MetaDataFilter(IMetaStoreClient.NotificationFilter notificationFilter, String catName, String databaseName, String tblName) {
            this(notificationFilter, catName, databaseName);
            this.tableName_ = tblName;
        }

        public void setNotificationFilter(IMetaStoreClient.NotificationFilter notificationFilter) {
            this.filter_ = notificationFilter;
        }

        public IMetaStoreClient.NotificationFilter getNotificationFilter() {
            return this.filter_;
        }

        public String getCatName() {
            return this.catName_;
        }

        public String getDbName() {
            return this.dbName_;
        }

        public String getTableName() {
            return this.tableName_;
        }
    }

    public static enum EventProcessorStatus {
        PAUSED,
        ACTIVE,
        ERROR,
        NEEDS_INVALIDATE,
        STOPPED,
        DISABLED;

    }
}

