package org.apache.impala.catalog.events;

import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetastoreClientInstantiationException;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.events.ConfigValidator;
import org.apache.impala.catalog.events.MetastoreEvents;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TEventBatchProgressInfo;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.HiveMetadataFormatUtils;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.NoOpEventSequence;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/impala/catalog/events/MetastoreEventsProcessor.class */
public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY = "hive.metastore.notifications.add.thrift.objects";
    private static final Logger LOG = LoggerFactory.getLogger(MetastoreEventsProcessor.class);
    private static final MessageDeserializer MESSAGE_DESERIALIZER = MetastoreShim.getMessageDeserializer();
    private static MetastoreEventsProcessor instance;
    private static final int EVENTS_BATCH_SIZE_PER_RPC = 1000;
    private static final int SCHEDULER_SHUTDOWN_TIMEOUT = 10;
    public static final String EVENTS_FETCH_DURATION_METRIC = "events-fetch-duration";
    public static final String EVENTS_PROCESS_DURATION_METRIC = "events-apply-duration";
    public static final String EVENTS_RECEIVED_METRIC = "events-received";
    public static final String EVENTS_SKIPPED_METRIC = "events-skipped";
    public static final String STATUS_METRIC = "status";
    public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
    public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time";
    public static final String LATEST_EVENT_ID = "latest-event-id";
    public static final String LATEST_EVENT_TIME = "latest-event-time";
    public static final String EVENT_PROCESSING_DELAY = "event-processing-delay";
    public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
    public static final String NUMBER_OF_PARTITION_REFRESHES = "partitions-refreshed";
    public static final String NUMBER_OF_TABLES_ADDED = "tables-added";
    public static final String NUMBER_OF_TABLES_REMOVED = "tables-removed";
    public static final String NUMBER_OF_DATABASES_ADDED = "databases-added";
    public static final String NUMBER_OF_DATABASES_REMOVED = "databases-removed";
    public static final String NUMBER_OF_PARTITIONS_ADDED = "partitions-added";
    public static final String NUMBER_OF_PARTITIONS_REMOVED = "partitions-removed";
    public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
    public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
    public static final String AVG_DELAY_IN_CONSUMING_EVENTS = "events-consuming-delay";
    private static final long SECOND_IN_NANOS = 1000000000;
    private final MetastoreEvents.MetastoreEventFactory metastoreEventFactory_;
    private NotificationEvent currentEvent_;
    private List<NotificationEvent> currentEventBatch_;
    private MetastoreEvents.MetastoreEvent currentFilteredEvent_;
    private List<MetastoreEvents.MetastoreEvent> currentFilteredEvents_;
    private final long pollingFrequencyInSec_;
    protected final CatalogServiceCatalog catalog_;
    private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED;
    private String eventProcessorErrorMsg_ = null;
    private long currentBatchStartTimeMs_ = 0;
    private long currentEventStartTimeMs_ = 0;
    private int currentEventIndex_ = 0;
    private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
    private final AtomicLong lastSyncedEventTimeSecs_ = new AtomicLong(0);
    private final AtomicLong latestEventId_ = new AtomicLong(0);
    private final AtomicLong latestEventTimeSecs_ = new AtomicLong(0);
    private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0);
    private final ScheduledExecutorService processEventsScheduler_ = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MetastoreEventsProcessor-ProcessEvents").build());
    private final ScheduledExecutorService updateEventIdScheduler_ = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MetastoreEventsProcessor-UpdateEventId").build());
    private final Metrics metrics_ = new Metrics();
    private final DeleteEventLog deleteEventLog_ = new DeleteEventLog();

    /* loaded from: input_file:org/apache/impala/catalog/events/MetastoreEventsProcessor$EventProcessorStatus.class */
    public enum EventProcessorStatus {
        PAUSED,
        ACTIVE,
        ERROR,
        NEEDS_INVALIDATE,
        STOPPED,
        DISABLED
    }

    /* loaded from: input_file:org/apache/impala/catalog/events/MetastoreEventsProcessor$MetaDataFilter.class */
    public static class MetaDataFilter {
        public IMetaStoreClient.NotificationFilter filter_;
        public String catName_;
        public String dbName_;
        public String tableName_;

        public MetaDataFilter(IMetaStoreClient.NotificationFilter notificationFilter) {
            this.filter_ = notificationFilter;
        }

        public MetaDataFilter(IMetaStoreClient.NotificationFilter notificationFilter, String str, String str2) {
            this(notificationFilter);
            this.catName_ = (String) Preconditions.checkNotNull(str);
            this.dbName_ = (String) Preconditions.checkNotNull(str2);
        }

        public MetaDataFilter(IMetaStoreClient.NotificationFilter notificationFilter, String str, String str2, String str3) {
            this(notificationFilter, str, str2);
            this.tableName_ = str3;
        }

        public void setNotificationFilter(IMetaStoreClient.NotificationFilter notificationFilter) {
            this.filter_ = notificationFilter;
        }

        public IMetaStoreClient.NotificationFilter getNotificationFilter() {
            return this.filter_;
        }

        public String getCatName() {
            return this.catName_;
        }

        public String getDbName() {
            return this.dbName_;
        }

        public String getTableName() {
            return this.tableName_;
        }
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb(CatalogServiceCatalog catalogServiceCatalog, long j, String str, String str2) throws MetastoreNotificationException {
        return getNextMetastoreEventsInBatchesForDb(catalogServiceCatalog, j, MetastoreShim.getDefaultCatalogName(), str, str2);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb(CatalogServiceCatalog catalogServiceCatalog, long j, String str, String str2, String str3) throws MetastoreNotificationException {
        Preconditions.checkNotNull(str3, "eventType is null in fetching db events");
        Preconditions.checkNotNull(str, "catName is null in fetching db events");
        Preconditions.checkNotNull(str2, "dbName is null in fetching db events");
        return getNextMetastoreEventsWithFilterInBatches(catalogServiceCatalog, j, new MetaDataFilter(notificationEvent -> {
            return str3.equals(notificationEvent.getEventType()) && str.equalsIgnoreCase(notificationEvent.getCatName()) && str2.equalsIgnoreCase(notificationEvent.getDbName());
        }, str, str2), 1000, str3);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForTable(CatalogServiceCatalog catalogServiceCatalog, long j, String str, String str2, String str3) throws MetastoreNotificationException {
        return getNextMetastoreEventsInBatchesForTable(catalogServiceCatalog, j, MetastoreShim.getDefaultCatalogName(), str, str2, str3);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatchesForTable(CatalogServiceCatalog catalogServiceCatalog, long j, String str, String str2, String str3, String str4) throws MetastoreNotificationException {
        Preconditions.checkNotNull(str4, "eventType is null in fetching table events");
        Preconditions.checkNotNull(str, "catName is null in fetching table events");
        Preconditions.checkNotNull(str2, "dbName is null in fetching table events");
        Preconditions.checkNotNull(str3, "tblName is null in fetching table events");
        return getNextMetastoreEventsWithFilterInBatches(catalogServiceCatalog, j, new MetaDataFilter(notificationEvent -> {
            return str4.equals(notificationEvent.getEventType()) && str.equalsIgnoreCase(notificationEvent.getCatName()) && str2.equalsIgnoreCase(notificationEvent.getDbName()) && str3.equalsIgnoreCase(notificationEvent.getTableName());
        }, str, str2, str3), 1000, str4);
    }

    public static List<NotificationEvent> getNextMetastoreEventsInBatches(CatalogServiceCatalog catalogServiceCatalog, long j, IMetaStoreClient.NotificationFilter notificationFilter, String... strArr) throws MetastoreNotificationFetchException {
        return getNextMetastoreEventsInBatches(catalogServiceCatalog, j, notificationFilter, 1000, strArr);
    }

    @VisibleForTesting
    public static List<NotificationEvent> getNextMetastoreEventsInBatches(CatalogServiceCatalog catalogServiceCatalog, long j, IMetaStoreClient.NotificationFilter notificationFilter, int i, String... strArr) throws MetastoreNotificationFetchException {
        return getNextMetastoreEventsWithFilterInBatches(catalogServiceCatalog, j, new MetaDataFilter(notificationFilter), i, strArr);
    }

    @VisibleForTesting
    public static List<NotificationEvent> getNextMetastoreEventsWithFilterInBatches(CatalogServiceCatalog catalogServiceCatalog, long j, MetaDataFilter metaDataFilter, int i, String... strArr) throws MetastoreNotificationFetchException {
        Preconditions.checkArgument(i > 0);
        ArrayList arrayList = new ArrayList();
        IMetaStoreClient.NotificationFilter notificationFilter = metaDataFilter.getNotificationFilter();
        try {
            MetaStoreClientPool.MetaStoreClient metaStoreClient = catalogServiceCatalog.getMetaStoreClient();
            Throwable th = null;
            try {
                try {
                    long eventId = metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
                    if (eventId <= j) {
                        if (metaStoreClient != null) {
                            if (0 != 0) {
                                try {
                                    metaStoreClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                metaStoreClient.close();
                            }
                        }
                        return arrayList;
                    }
                    long j2 = j;
                    List<String> defaultSkippedHmsEventTypes = catalogServiceCatalog.getDefaultSkippedHmsEventTypes();
                    String str = null;
                    if (strArr != null && strArr.length > 0) {
                        defaultSkippedHmsEventTypes = Lists.newArrayList(catalogServiceCatalog.getCommonHmsEventTypes());
                        defaultSkippedHmsEventTypes.removeIf(str2 -> {
                            return Arrays.asList(strArr).contains(str2);
                        });
                        str = String.join(",", strArr) + " ";
                    }
                    Logger logger = LOG;
                    Object[] objArr = new Object[4];
                    objArr[0] = str == null ? "" : str;
                    objArr[1] = Long.valueOf(j);
                    objArr[2] = Long.valueOf(eventId);
                    objArr[3] = Long.valueOf(eventId - j);
                    logger.info("Fetching {}events started from id {} to {}. Gap: {}", objArr);
                    int i2 = 0;
                    while (j2 < eventId) {
                        int min = Math.min(i, (int) (eventId - j2));
                        NotificationEventRequest notificationEventRequest = new NotificationEventRequest();
                        notificationEventRequest.setMaxEvents(min);
                        notificationEventRequest.setLastEvent(j2);
                        MetastoreShim.setNotificationEventRequestWithFilter(notificationEventRequest, metaDataFilter);
                        NotificationEventResponse nextNotification = MetastoreShim.getNextNotification(metaStoreClient.getHiveClient(), notificationEventRequest, defaultSkippedHmsEventTypes);
                        if (nextNotification.getEvents().isEmpty()) {
                            break;
                        }
                        for (NotificationEvent notificationEvent : nextNotification.getEvents()) {
                            if (notificationFilter == null || notificationFilter.accept(notificationEvent)) {
                                arrayList.add(notificationEvent);
                            } else {
                                i2++;
                            }
                            j2 = notificationEvent.getEventId();
                        }
                    }
                    if (i2 > 0) {
                        LOG.info("Got {} events and filtered out {} locally from {} events start from id {}", new Object[]{Integer.valueOf(arrayList.size()), Integer.valueOf(i2), Long.valueOf(eventId - j), Long.valueOf(j + 1)});
                    }
                    if (metaStoreClient != null) {
                        if (0 != 0) {
                            try {
                                metaStoreClient.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            metaStoreClient.close();
                        }
                    }
                    return arrayList;
                } finally {
                }
            } catch (Throwable th4) {
                if (metaStoreClient != null) {
                    if (th != null) {
                        try {
                            metaStoreClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        metaStoreClient.close();
                    }
                }
                throw th4;
            }
        } catch (MetastoreClientInstantiationException | TException e) {
            throw new MetastoreNotificationFetchException(String.format(CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e);
        }
    }

    public static void syncToLatestEventId(CatalogServiceCatalog catalogServiceCatalog, Table table, EventFactory eventFactory, Metrics metrics) throws CatalogException, MetastoreNotificationException {
        Preconditions.checkArgument(table != null, "tbl is null");
        Preconditions.checkState(!(table instanceof IncompleteTable) && table.isLoaded(), "table %s is either incomplete or not loaded", table.getFullName());
        Preconditions.checkState(table.isWriteLockedByCurrentThread(), String.format("Write lock is not held on table %s by current thread", table.getFullName()));
        long lastSyncedEventId = table.getLastSyncedEventId();
        Preconditions.checkArgument(lastSyncedEventId > 0, "lastEvent  Id %s for table %s should be greater than 0", lastSyncedEventId, table.getFullName());
        ThreadNameAnnotator threadNameAnnotator = new ThreadNameAnnotator(String.format("sync table %s to latest HMS event id", table.getFullName()));
        Throwable th = null;
        try {
            List<NotificationEvent> nextMetastoreEventsWithFilterInBatches = getNextMetastoreEventsWithFilterInBatches(catalogServiceCatalog, lastSyncedEventId, AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters()) ? new MetaDataFilter(getTableNotificationEventFilter(table)) : new MetaDataFilter(getTableNotificationEventFilter(table), MetastoreShim.getDefaultCatalogName(), table.getDb().getName(), table.getName()), 1000, new String[0]);
            if (nextMetastoreEventsWithFilterInBatches.isEmpty()) {
                LOG.debug("table {} synced till event id {}. No new HMS events to process from event id: {}", new Object[]{table.getFullName(), Long.valueOf(lastSyncedEventId), Long.valueOf(lastSyncedEventId + 1)});
                if (threadNameAnnotator != null) {
                    if (0 == 0) {
                        threadNameAnnotator.close();
                        return;
                    }
                    try {
                        threadNameAnnotator.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            MetastoreEvents.MetastoreEvent metastoreEvent = null;
            Iterator<NotificationEvent> it = nextMetastoreEventsWithFilterInBatches.iterator();
            while (it.hasNext()) {
                metastoreEvent = eventFactory.get(it.next(), metrics);
                LOG.trace("for table {}, processing event {}", table.getFullName(), metastoreEvent);
                metastoreEvent.processIfEnabled();
                if (metastoreEvent.isDropEvent()) {
                    Preconditions.checkNotNull(metastoreEvent.getDbName());
                    Preconditions.checkNotNull(metastoreEvent.getTableName());
                    catalogServiceCatalog.getMetastoreEventProcessor().getDeleteEventLog().addRemovedObject(metastoreEvent.getEventId(), DeleteEventLog.getTblKey(metastoreEvent.getDbName(), metastoreEvent.getTableName()));
                }
                if (metastoreEvent instanceof MetastoreEvents.DropTableEvent) {
                    if (threadNameAnnotator != null) {
                        if (0 == 0) {
                            threadNameAnnotator.close();
                            return;
                        }
                        try {
                            threadNameAnnotator.close();
                            return;
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                            return;
                        }
                    }
                    return;
                }
            }
            if (metastoreEvent.getEventId() > table.getLastSyncedEventId()) {
                table.setLastSyncedEventId(metastoreEvent.getEventId());
            }
            LOG.info("Synced table {} till HMS event:  {}", table.getFullName(), Long.valueOf(table.getLastSyncedEventId()));
            if (threadNameAnnotator != null) {
                if (0 == 0) {
                    threadNameAnnotator.close();
                    return;
                }
                try {
                    threadNameAnnotator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (threadNameAnnotator != null) {
                if (0 != 0) {
                    try {
                        threadNameAnnotator.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    threadNameAnnotator.close();
                }
            }
            throw th5;
        }
    }

    public static void syncToLatestEventId(CatalogServiceCatalog catalogServiceCatalog, Db db, EventFactory eventFactory, Metrics metrics) throws CatalogException, MetastoreNotificationException {
        Preconditions.checkArgument(db != null, "db is null");
        long lastSyncedEventId = db.getLastSyncedEventId();
        Preconditions.checkArgument(lastSyncedEventId > 0, "Invalid last synced event ID %s for db %s ", lastSyncedEventId, db.getName());
        Preconditions.checkState(db.isLockHeldByCurrentThread(), "Current thread does not hold lock on db: %s", db.getName());
        ThreadNameAnnotator threadNameAnnotator = new ThreadNameAnnotator(String.format("sync db %s to latest HMS event id", db.getName()));
        Throwable th = null;
        try {
            List<NotificationEvent> nextMetastoreEventsWithFilterInBatches = getNextMetastoreEventsWithFilterInBatches(catalogServiceCatalog, lastSyncedEventId, new MetaDataFilter(getDbNotificationEventFilter(db), MetastoreShim.getDefaultCatalogName(), db.getName()), 1000, new String[0]);
            if (nextMetastoreEventsWithFilterInBatches.isEmpty()) {
                LOG.debug("db {} already synced till event id: {}, no new hms events from event id: {}", new Object[]{db.getName(), Long.valueOf(lastSyncedEventId), Long.valueOf(lastSyncedEventId + 1)});
                if (threadNameAnnotator != null) {
                    if (0 == 0) {
                        threadNameAnnotator.close();
                        return;
                    }
                    try {
                        threadNameAnnotator.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            MetastoreEvents.MetastoreEvent metastoreEvent = null;
            Iterator<NotificationEvent> it = nextMetastoreEventsWithFilterInBatches.iterator();
            while (it.hasNext()) {
                metastoreEvent = eventFactory.get(it.next(), metrics);
                LOG.trace("for db {}, processing event: {}", db.getName(), metastoreEvent);
                metastoreEvent.processIfEnabled();
                if (metastoreEvent.isDropEvent()) {
                    Preconditions.checkState(metastoreEvent instanceof MetastoreEvents.DropDatabaseEvent, "invalid drop event {} ", metastoreEvent);
                    Preconditions.checkNotNull(metastoreEvent.getDbName());
                    catalogServiceCatalog.getMetastoreEventProcessor().getDeleteEventLog().addRemovedObject(metastoreEvent.getEventId(), DeleteEventLog.getDbKey(metastoreEvent.getDbName()));
                    if (threadNameAnnotator != null) {
                        if (0 == 0) {
                            threadNameAnnotator.close();
                            return;
                        }
                        try {
                            threadNameAnnotator.close();
                            return;
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                            return;
                        }
                    }
                    return;
                }
            }
            db.setLastSyncedEventId(metastoreEvent.getEventId());
            LOG.info("Synced db {} till HMS event {}", db.getName(), metastoreEvent);
            if (threadNameAnnotator != null) {
                if (0 == 0) {
                    threadNameAnnotator.close();
                    return;
                }
                try {
                    threadNameAnnotator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (threadNameAnnotator != null) {
                if (0 != 0) {
                    try {
                        threadNameAnnotator.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    threadNameAnnotator.close();
                }
            }
            throw th5;
        }
    }

    private static IMetaStoreClient.NotificationFilter getTableNotificationEventFilter(final Table table) {
        return new IMetaStoreClient.NotificationFilter() { // from class: org.apache.impala.catalog.events.MetastoreEventsProcessor.1
            public boolean accept(NotificationEvent notificationEvent) {
                return (notificationEvent.getDbName() == null || notificationEvent.getTableName() == null) ? notificationEvent.getDbName() == null : Table.this.getDb().getName().equalsIgnoreCase(notificationEvent.getDbName()) && Table.this.getName().equalsIgnoreCase(notificationEvent.getTableName());
            }
        };
    }

    @VisibleForTesting
    public static IMetaStoreClient.NotificationFilter getDbNotificationEventFilter(final Db db) {
        return new IMetaStoreClient.NotificationFilter() { // from class: org.apache.impala.catalog.events.MetastoreEventsProcessor.2
            public boolean accept(NotificationEvent notificationEvent) {
                return (notificationEvent.getDbName() == null || notificationEvent.getTableName() != null) ? notificationEvent.getTableName() == null : Db.this.getName().equalsIgnoreCase(notificationEvent.getDbName());
            }
        };
    }

    @VisibleForTesting
    MetastoreEventsProcessor(CatalogOpExecutor catalogOpExecutor, long j, long j2) throws CatalogException {
        Preconditions.checkState(j2 > 0);
        this.catalog_ = (CatalogServiceCatalog) Preconditions.checkNotNull(catalogOpExecutor.getCatalog());
        validateConfigs();
        this.lastSyncedEventId_.set(j);
        this.lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(j));
        initMetrics();
        this.metastoreEventFactory_ = new MetastoreEvents.MetastoreEventFactory(catalogOpExecutor);
        this.pollingFrequencyInSec_ = j2;
    }

    @VisibleForTesting
    void validateConfigs() throws CatalogException {
        ArrayList arrayList = new ArrayList();
        for (MetastoreEventProcessorConfig metastoreEventProcessorConfig : getEventProcessorConfigsToValidate()) {
            String configKey = metastoreEventProcessorConfig.getValidator().getConfigKey();
            try {
                ConfigValidator.ValidationResult validate = metastoreEventProcessorConfig.validate(getConfigValueFromMetastore(configKey, ""));
                if (!validate.isValid()) {
                    arrayList.add(validate);
                }
            } catch (TException e) {
                String format = String.format("Unable to get configuration %s from metastore. Check if metastore is accessible", configKey);
                LOG.error(format, e);
                throw new CatalogException(format);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.error("Found {} incorrect metastore configuration(s).", Integer.valueOf(arrayList.size()));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            LOG.error(((ConfigValidator.ValidationResult) it.next()).getReason());
        }
        throw new CatalogException(String.format("Found %d incorrect metastore configuration(s). Events processor cannot start. See ERROR log for more details.", Integer.valueOf(arrayList.size())));
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public DeleteEventLog getDeleteEventLog() {
        return this.deleteEventLog_;
    }

    public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() {
        return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME);
    }

    private void initMetrics() {
        this.metrics_.addTimer(EVENTS_FETCH_DURATION_METRIC);
        this.metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC);
        this.metrics_.addMeter(EVENTS_RECEIVED_METRIC);
        this.metrics_.addCounter(EVENTS_SKIPPED_METRIC);
        this.metrics_.addGauge("status", () -> {
            return getStatus().toString();
        });
        Metrics metrics = this.metrics_;
        AtomicLong atomicLong = this.lastSyncedEventId_;
        atomicLong.getClass();
        metrics.addGauge(LAST_SYNCED_ID_METRIC, atomicLong::get);
        Metrics metrics2 = this.metrics_;
        AtomicLong atomicLong2 = this.lastSyncedEventTimeSecs_;
        atomicLong2.getClass();
        metrics2.addGauge(LAST_SYNCED_EVENT_TIME, atomicLong2::get);
        Metrics metrics3 = this.metrics_;
        AtomicLong atomicLong3 = this.latestEventId_;
        atomicLong3.getClass();
        metrics3.addGauge(LATEST_EVENT_ID, atomicLong3::get);
        Metrics metrics4 = this.metrics_;
        AtomicLong atomicLong4 = this.latestEventTimeSecs_;
        atomicLong4.getClass();
        metrics4.addGauge(LATEST_EVENT_TIME, atomicLong4::get);
        this.metrics_.addGauge(EVENT_PROCESSING_DELAY, () -> {
            return Long.valueOf(this.latestEventTimeSecs_.get() - this.lastSyncedEventTimeSecs_.get());
        });
        this.metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
        this.metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
        this.metrics_.addCounter(NUMBER_OF_TABLES_ADDED);
        this.metrics_.addCounter(NUMBER_OF_TABLES_REMOVED);
        this.metrics_.addCounter(NUMBER_OF_DATABASES_ADDED);
        this.metrics_.addCounter(NUMBER_OF_DATABASES_REMOVED);
        this.metrics_.addCounter(NUMBER_OF_PARTITIONS_ADDED);
        this.metrics_.addCounter(NUMBER_OF_PARTITIONS_REMOVED);
        Metrics metrics5 = this.metrics_;
        DeleteEventLog deleteEventLog = this.deleteEventLog_;
        deleteEventLog.getClass();
        metrics5.addGauge(DELETE_EVENT_LOG_SIZE, deleteEventLog::size);
        this.metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
        this.metrics_.addTimer(AVG_DELAY_IN_CONSUMING_EVENTS);
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public synchronized void start() {
        Preconditions.checkState(this.eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
        resetProgressInfo();
        startScheduler();
        updateStatus(EventProcessorStatus.ACTIVE);
        LOG.info(String.format("Successfully started metastore event processing. Polling interval: %d seconds.", Long.valueOf(this.pollingFrequencyInSec_)));
    }

    public EventProcessorStatus getStatus() {
        return this.eventProcessorStatus_;
    }

    @VisibleForTesting
    public String getConfigValueFromMetastore(String str, String str2) throws TException {
        MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();
        Throwable th = null;
        try {
            try {
                String metastoreConfigValue = MetaStoreUtil.getMetastoreConfigValue(metaStoreClient.getHiveClient(), str, str2);
                if (metaStoreClient != null) {
                    if (0 != 0) {
                        try {
                            metaStoreClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        metaStoreClient.close();
                    }
                }
                return metastoreConfigValue;
            } finally {
            }
        } catch (Throwable th3) {
            if (metaStoreClient != null) {
                if (th != null) {
                    try {
                        metaStoreClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    metaStoreClient.close();
                }
            }
            throw th3;
        }
    }

    @VisibleForTesting
    public long getLastSyncedEventId() {
        return this.lastSyncedEventId_.get();
    }

    @VisibleForTesting
    public long getLatestEventId() {
        return this.latestEventId_.get();
    }

    @VisibleForTesting
    void startScheduler() {
        Preconditions.checkState(this.pollingFrequencyInSec_ > 0);
        LOG.info(String.format("Starting metastore event polling with interval %d seconds.", Long.valueOf(this.pollingFrequencyInSec_)));
        this.processEventsScheduler_.scheduleAtFixedRate(this::processEvents, this.pollingFrequencyInSec_, this.pollingFrequencyInSec_, TimeUnit.SECONDS);
        this.updateEventIdScheduler_.scheduleAtFixedRate(this::updateLatestEventId, this.pollingFrequencyInSec_, this.pollingFrequencyInSec_, TimeUnit.SECONDS);
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public synchronized void pause() {
        if (this.eventProcessorStatus_ == EventProcessorStatus.PAUSED) {
            return;
        }
        updateStatus(EventProcessorStatus.PAUSED);
        LOG.info(String.format("Event processing is paused. Last synced event id is %d", Long.valueOf(this.lastSyncedEventId_.get())));
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public long getCurrentEventId() throws MetastoreNotificationFetchException {
        try {
            MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();
            Throwable th = null;
            try {
                long eventId = metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
                if (metaStoreClient != null) {
                    if (0 != 0) {
                        try {
                            metaStoreClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        metaStoreClient.close();
                    }
                }
                return eventId;
            } finally {
            }
        } catch (MetastoreClientInstantiationException | TException e) {
            throw new MetastoreNotificationFetchException("Unable to fetch the current notification event id. Check if metastore service is accessible", e);
        }
    }

    public static long getCurrentEventIdNoThrow(IMetaStoreClient iMetaStoreClient) {
        long j = -1;
        try {
            j = iMetaStoreClient.getCurrentNotificationEventId().getEventId();
        } catch (TException e) {
            LOG.warn(String.format("Unable to fetch latest event id from HMS: %s", e.getMessage()));
        }
        return j;
    }

    private NotificationEvent getEventFromHMS(MetaStoreClientPool.MetaStoreClient metaStoreClient, long j) {
        NotificationEventRequest notificationEventRequest = new NotificationEventRequest();
        notificationEventRequest.setLastEvent(j - 1);
        notificationEventRequest.setMaxEvents(1);
        try {
            Iterator eventsIterator = MetastoreShim.getNextNotification(metaStoreClient.getHiveClient(), notificationEventRequest, null).getEventsIterator();
            if (eventsIterator.hasNext()) {
                return (NotificationEvent) eventsIterator.next();
            }
            LOG.warn("Unable to fetch event {}. It has been cleaned up", Long.valueOf(j));
            return null;
        } catch (TException e) {
            LOG.warn("Unable to fetch event {}", Long.valueOf(j), e);
            return null;
        }
    }

    @VisibleForTesting
    public int getEventTimeFromHMS(long j) {
        try {
            MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();
            Throwable th = null;
            try {
                try {
                    NotificationEvent eventFromHMS = getEventFromHMS(metaStoreClient, j);
                    if (eventFromHMS == null) {
                        if (metaStoreClient != null) {
                            if (0 != 0) {
                                try {
                                    metaStoreClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                metaStoreClient.close();
                            }
                        }
                        return 0;
                    }
                    int eventTime = eventFromHMS.getEventTime();
                    if (metaStoreClient != null) {
                        if (0 != 0) {
                            try {
                                metaStoreClient.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            metaStoreClient.close();
                        }
                    }
                    return eventTime;
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (MetastoreClientInstantiationException e) {
            LOG.error("Failed to get event time from HMS for event {}", Long.valueOf(j), e);
            return 0;
        }
        LOG.error("Failed to get event time from HMS for event {}", Long.valueOf(j), e);
        return 0;
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public synchronized void start(long j) {
        Preconditions.checkArgument(j >= 0);
        EventProcessorStatus eventProcessorStatus = this.eventProcessorStatus_;
        long j2 = this.lastSyncedEventId_.get();
        if (eventProcessorStatus != EventProcessorStatus.ACTIVE || j2 < j) {
            this.eventProcessorErrorMsg_ = null;
            resetProgressInfo();
            this.lastSyncedEventId_.set(j);
            this.lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(j));
            updateStatus(EventProcessorStatus.ACTIVE);
            LOG.info(String.format("Metastore event processing restarted. Last synced event id was updated from %d to %d", Long.valueOf(j2), Long.valueOf(this.lastSyncedEventId_.get())));
        }
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public synchronized void shutdown() {
        Preconditions.checkState(this.eventProcessorStatus_ != EventProcessorStatus.STOPPED, "Event processing is already stopped");
        shutdownAndAwaitTermination(this.processEventsScheduler_);
        shutdownAndAwaitTermination(this.updateEventIdScheduler_);
        updateStatus(EventProcessorStatus.STOPPED);
        LOG.info("Metastore event processing stopped.");
    }

    private synchronized void shutdownAndAwaitTermination(ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(scheduledExecutorService);
        scheduledExecutorService.shutdown();
        try {
            if (!scheduledExecutorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                LOG.info(String.format("Scheduler pool did not terminate within %d seconds. Attempting to stop currently running tasks", 10));
                scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOG.info("Received interruptedException. Terminating currently running tasks.", e);
            scheduledExecutorService.shutdownNow();
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x0151: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:64:0x0151 */
    /* JADX WARN: Not initialized variable reg: 15, insn: 0x0156: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r15 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:66:0x0156 */
    /* JADX WARN: Type inference failed for: r14v0, types: [org.apache.impala.catalog.MetaStoreClientPool$MetaStoreClient] */
    /* JADX WARN: Type inference failed for: r15v0, types: [java.lang.Throwable] */
    public List<NotificationEvent> getNextMetastoreEvents(long j, long j2, boolean z, @Nullable IMetaStoreClient.NotificationFilter notificationFilter) throws MetastoreNotificationFetchException {
        ?? r14;
        ?? r15;
        if (j2 <= j) {
            return Collections.emptyList();
        }
        Timer.Context time = this.metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time();
        try {
            try {
                try {
                    MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();
                    Throwable th = null;
                    int i = z ? -1 : 1000;
                    NotificationEventRequest notificationEventRequest = new NotificationEventRequest();
                    notificationEventRequest.setLastEvent(j);
                    notificationEventRequest.setMaxEvents(i);
                    NotificationEventResponse nextNotification = MetastoreShim.getNextNotification(metaStoreClient.getHiveClient(), notificationEventRequest, this.catalog_.getDefaultSkippedHmsEventTypes());
                    LOG.info("Received {} events. First event id: {}.", Integer.valueOf(nextNotification.getEvents().size()), nextNotification.getEvents().size() > 0 ? Long.valueOf(((NotificationEvent) nextNotification.getEvents().get(0)).getEventId()) : "none");
                    if (notificationFilter == null) {
                        List<NotificationEvent> events = nextNotification.getEvents();
                        if (metaStoreClient != null) {
                            if (0 != 0) {
                                try {
                                    metaStoreClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                metaStoreClient.close();
                            }
                        }
                        return events;
                    }
                    ArrayList arrayList = new ArrayList();
                    for (NotificationEvent notificationEvent : nextNotification.getEvents()) {
                        if (notificationFilter.accept(notificationEvent)) {
                            arrayList.add(notificationEvent);
                        }
                    }
                    if (metaStoreClient != null) {
                        if (0 != 0) {
                            try {
                                metaStoreClient.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            metaStoreClient.close();
                        }
                    }
                    time.stop();
                    return arrayList;
                } catch (MetastoreClientInstantiationException | TException e) {
                    throw new MetastoreNotificationFetchException("Unable to fetch notifications from metastore. Last synced event id is " + j, e);
                }
            } catch (Throwable th4) {
                if (r14 != 0) {
                    if (r15 != 0) {
                        try {
                            r14.close();
                        } catch (Throwable th5) {
                            r15.addSuppressed(th5);
                        }
                    } else {
                        r14.close();
                    }
                }
                throw th4;
            }
        } finally {
            time.stop();
        }
    }

    @VisibleForTesting
    protected List<NotificationEvent> getNextMetastoreEvents() throws MetastoreNotificationFetchException {
        return getNextMetastoreEvents(getCurrentEventId());
    }

    @VisibleForTesting
    public List<NotificationEvent> getNextMetastoreEvents(long j) throws MetastoreNotificationFetchException {
        return getNextMetastoreEvents(this.lastSyncedEventId_.get(), j, false, null);
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public void processEvents() {
        try {
            EventProcessorStatus eventProcessorStatus = this.eventProcessorStatus_;
            if (eventProcessorStatus != EventProcessorStatus.ACTIVE) {
                LOG.warn(String.format("Event processing is skipped since status is %s. Last synced event id is %d", eventProcessorStatus, Long.valueOf(this.lastSyncedEventId_.get())));
                tryAutoGlobalInvalidateOnFailure();
            } else {
                long currentEventId = getCurrentEventId();
                processEvents(currentEventId, getNextMetastoreEvents(currentEventId));
            }
        } catch (MetastoreNotificationFetchException e) {
            LOG.error("Unable to fetch the next batch of metastore events. Hive Metastore may be unavailable. Will retry.", e);
        } catch (MetastoreNotificationNeedsInvalidateException e2) {
            updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
            LOG.error("Event processing needs a invalidate command to resolve the state", e2);
            this.eventProcessorErrorMsg_ = LocalDateTime.now().toString() + "\nEvent processing needs a invalidate command to resolve the state\n" + ExceptionUtils.getFullStackTrace(e2);
            tryAutoGlobalInvalidateOnFailure();
        } catch (Exception e3) {
            updateStatus(EventProcessorStatus.ERROR);
            LOG.error("Unexpected exception received while processing event", e3);
            this.eventProcessorErrorMsg_ = LocalDateTime.now().toString() + "\nUnexpected exception received while processing event\n" + ExceptionUtils.getFullStackTrace(e3);
            dumpEventInfoToLog(this.currentEvent_);
            tryAutoGlobalInvalidateOnFailure();
        }
    }

    private void tryAutoGlobalInvalidateOnFailure() {
        EventProcessorStatus eventProcessorStatus = this.eventProcessorStatus_;
        if (BackendConfig.INSTANCE.isInvalidateGlobalMetadataOnEventProcessFailureEnabled()) {
            if (eventProcessorStatus == EventProcessorStatus.ERROR || eventProcessorStatus == EventProcessorStatus.NEEDS_INVALIDATE) {
                try {
                    LOG.error("Triggering auto global invalidation");
                    this.catalog_.reset(NoOpEventSequence.INSTANCE);
                    this.eventProcessorErrorMsg_ = null;
                } catch (Exception e) {
                    LOG.error("Automatic global invalidate metadata failed", e);
                }
            }
        }
    }

    @VisibleForTesting
    public void updateLatestEventId() {
        if (this.eventProcessorStatus_ == EventProcessorStatus.DISABLED) {
            return;
        }
        try {
            MetaStoreClientPool.MetaStoreClient metaStoreClient = this.catalog_.getMetaStoreClient();
            Throwable th = null;
            try {
                try {
                    long eventId = metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
                    if (eventId <= this.latestEventId_.get()) {
                        if (metaStoreClient != null) {
                            if (0 == 0) {
                                metaStoreClient.close();
                                return;
                            }
                            try {
                                metaStoreClient.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    NotificationEvent eventFromHMS = getEventFromHMS(metaStoreClient, eventId);
                    if (eventFromHMS == null) {
                        if (metaStoreClient != null) {
                            if (0 == 0) {
                                metaStoreClient.close();
                                return;
                            }
                            try {
                                metaStoreClient.close();
                                return;
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                                return;
                            }
                        }
                        return;
                    }
                    long j = this.lastSyncedEventId_.get();
                    long j2 = this.lastSyncedEventTimeSecs_.get();
                    long eventTime = eventFromHMS.getEventTime();
                    this.latestEventId_.set(eventId);
                    this.latestEventTimeSecs_.set(eventTime);
                    LOG.info("Latest event in HMS: id={}, time={}. Last synced event: id={}, time={}.", new Object[]{Long.valueOf(eventId), Long.valueOf(eventTime), Long.valueOf(j), Long.valueOf(j2)});
                    if (eventTime > j2) {
                        LOG.warn("Lag: {}. {} events pending to be processed.", PrintUtils.printTimeMs((eventTime - j2) * 1000), Long.valueOf(eventId - j));
                    }
                    if (metaStoreClient != null) {
                        if (0 != 0) {
                            try {
                                metaStoreClient.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            metaStoreClient.close();
                        }
                    }
                    return;
                } catch (Throwable th5) {
                    th = th5;
                    throw th5;
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Unable to update current notification event id. Last value: {}", this.latestEventId_, e);
        }
        LOG.error("Unable to update current notification event id. Last value: {}", this.latestEventId_, e);
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public TEventProcessorMetrics getEventProcessorMetrics() {
        TEventProcessorMetrics tEventProcessorMetrics = new TEventProcessorMetrics();
        EventProcessorStatus status = getStatus();
        tEventProcessorMetrics.setStatus(status.toString());
        tEventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
        if (status != EventProcessorStatus.ACTIVE) {
            return tEventProcessorMetrics;
        }
        tEventProcessorMetrics.setLast_synced_event_time(this.lastSyncedEventTimeSecs_.get());
        tEventProcessorMetrics.setLatest_event_id(this.latestEventId_.get());
        tEventProcessorMetrics.setLatest_event_time(this.latestEventTimeSecs_.get());
        long count = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
        long count2 = this.metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
        tEventProcessorMetrics.setEvents_received(count);
        tEventProcessorMetrics.setEvents_skipped(count2);
        Snapshot snapshot = this.metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getSnapshot();
        double mean = snapshot.getMean() / 1.0E9d;
        double d = snapshot.get75thPercentile() / 1.0E9d;
        double d2 = snapshot.get95thPercentile() / 1.0E9d;
        double d3 = snapshot.get99thPercentile() / 1.0E9d;
        tEventProcessorMetrics.setEvents_fetch_duration_mean(mean);
        tEventProcessorMetrics.setEvents_fetch_duration_p75(d);
        tEventProcessorMetrics.setEvents_fetch_duration_p95(d2);
        tEventProcessorMetrics.setEvents_fetch_duration_p99(d3);
        Snapshot snapshot2 = this.metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getSnapshot();
        double mean2 = snapshot2.getMean() / 1.0E9d;
        double d4 = snapshot2.get75thPercentile() / 1.0E9d;
        double d5 = snapshot2.get95thPercentile() / 1.0E9d;
        double d6 = snapshot2.get99thPercentile() / 1.0E9d;
        tEventProcessorMetrics.setEvents_process_duration_mean(mean2);
        tEventProcessorMetrics.setEvents_process_duration_p75(d4);
        tEventProcessorMetrics.setEvents_process_duration_p95(d5);
        tEventProcessorMetrics.setEvents_process_duration_p99(d6);
        tEventProcessorMetrics.setLast_events_process_duration(this.lastEventProcessDurationNs_.get() / 1.0E9d);
        double oneMinuteRate = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate();
        double fiveMinuteRate = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate();
        double fifteenMinuteRate = this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate();
        tEventProcessorMetrics.setEvents_received_1min_rate(oneMinuteRate);
        tEventProcessorMetrics.setEvents_received_5min_rate(fiveMinuteRate);
        tEventProcessorMetrics.setEvents_received_15min_rate(fifteenMinuteRate);
        LOG.trace("Events Received: {} Events skipped: {} Avg fetch duration: {} Avg process duration: {} Events received rate (1min) : {}", new Object[]{Long.valueOf(count), Long.valueOf(count2), Double.valueOf(mean), Double.valueOf(mean2), Double.valueOf(oneMinuteRate)});
        return tEventProcessorMetrics;
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
        TEventProcessorMetricsSummaryResponse tEventProcessorMetricsSummaryResponse = new TEventProcessorMetricsSummaryResponse();
        tEventProcessorMetricsSummaryResponse.setSummary(this.metrics_.toString());
        if (this.eventProcessorErrorMsg_ != null) {
            tEventProcessorMetricsSummaryResponse.setError_msg(this.eventProcessorErrorMsg_);
        }
        TEventBatchProgressInfo tEventBatchProgressInfo = new TEventBatchProgressInfo();
        tEventBatchProgressInfo.last_synced_event_id = this.lastSyncedEventId_.get();
        tEventBatchProgressInfo.last_synced_event_time_s = this.lastSyncedEventTimeSecs_.get();
        tEventBatchProgressInfo.latest_event_id = this.latestEventId_.get();
        tEventBatchProgressInfo.latest_event_time_s = this.latestEventTimeSecs_.get();
        List<NotificationEvent> list = this.currentEventBatch_;
        List<MetastoreEvents.MetastoreEvent> list2 = this.currentFilteredEvents_;
        if (list != null && !list.isEmpty()) {
            int size = list.size();
            tEventBatchProgressInfo.num_hms_events = size;
            tEventBatchProgressInfo.min_event_id = list.get(0).getEventId();
            tEventBatchProgressInfo.min_event_time_s = list.get(0).getEventTime();
            tEventBatchProgressInfo.max_event_id = list.get(size - 1).getEventId();
            tEventBatchProgressInfo.max_event_time_s = r0.getEventTime();
            tEventBatchProgressInfo.current_batch_start_time_ms = this.currentBatchStartTimeMs_;
            tEventBatchProgressInfo.current_event_start_time_ms = this.currentEventStartTimeMs_;
            if (list2 != null) {
                tEventBatchProgressInfo.num_filtered_events = list2.size();
            }
            tEventBatchProgressInfo.current_event_index = this.currentEventIndex_;
            tEventBatchProgressInfo.current_event_batch_size = this.currentFilteredEvent_ != null ? this.currentFilteredEvent_.getNumberOfEvents() : 0;
            tEventBatchProgressInfo.current_event = this.currentEvent_;
        }
        tEventProcessorMetricsSummaryResponse.setProgress(tEventBatchProgressInfo);
        return tEventProcessorMetricsSummaryResponse;
    }

    @VisibleForTesting
    Metrics getMetrics() {
        return this.metrics_;
    }

    private void resetProgressInfo() {
        this.currentEvent_ = null;
        this.currentEventBatch_ = null;
        this.currentFilteredEvent_ = null;
        this.currentFilteredEvents_ = null;
        this.currentBatchStartTimeMs_ = 0L;
        this.currentEventStartTimeMs_ = 0L;
        this.currentEventIndex_ = 0;
    }

    @VisibleForTesting
    protected void processEvents(long j, List<NotificationEvent> list) throws MetastoreNotificationException {
        this.currentEventBatch_ = list;
        this.metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(list.size());
        if (list.isEmpty()) {
            if (this.lastSyncedEventId_.get() < j) {
                this.lastSyncedEventId_.set(j);
                this.lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(j));
                return;
            }
            return;
        }
        Timer.Context time = this.metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
        this.currentBatchStartTimeMs_ = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        try {
            try {
                this.currentFilteredEvents_ = this.metastoreEventFactory_.getFilteredEvents(list, this.metrics_);
                if (this.currentFilteredEvents_.isEmpty()) {
                    this.lastSyncedEventId_.set(list.get(list.size() - 1).getEventId());
                    this.lastSyncedEventTimeSecs_.set(r0.getEventTime());
                    resetProgressInfo();
                    long stop = time.stop();
                    this.lastEventProcessDurationNs_.set(stop);
                    logEventMetrics(hashMap, stop);
                    return;
                }
                Iterator<MetastoreEvents.MetastoreEvent> it = this.currentFilteredEvents_.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    MetastoreEvents.MetastoreEvent next = it.next();
                    synchronized (this) {
                        if (this.eventProcessorStatus_ == EventProcessorStatus.ACTIVE) {
                            this.currentEvent_ = next.metastoreNotificationEvent_;
                            this.currentFilteredEvent_ = next;
                            try {
                                ThreadNameAnnotator threadNameAnnotator = new ThreadNameAnnotator(String.format("Processing %s on %s, eventId=%d", next.getEventType(), next.getTargetName(), Long.valueOf(next.getEventId())));
                                Throwable th = null;
                                try {
                                    try {
                                        this.currentEventStartTimeMs_ = System.currentTimeMillis();
                                        next.processIfEnabled();
                                        hashMap.put(next, Long.valueOf(System.currentTimeMillis() - this.currentEventStartTimeMs_));
                                        if (threadNameAnnotator != null) {
                                            if (0 != 0) {
                                                try {
                                                    threadNameAnnotator.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                threadNameAnnotator.close();
                                            }
                                        }
                                    } catch (Throwable th3) {
                                        th = th3;
                                        throw th3;
                                        break;
                                    }
                                } catch (Throwable th4) {
                                    if (threadNameAnnotator != null) {
                                        if (th != null) {
                                            try {
                                                threadNameAnnotator.close();
                                            } catch (Throwable th5) {
                                                th.addSuppressed(th5);
                                            }
                                        } else {
                                            threadNameAnnotator.close();
                                        }
                                    }
                                    throw th4;
                                    break;
                                }
                            } catch (Exception e) {
                                try {
                                    if (!next.onFailure(e)) {
                                        next.errorLog("Unable to handle event processing failure", new Object[0]);
                                        throw e;
                                    }
                                } catch (Exception e2) {
                                    next.errorLog("Failed to handle event processing failure", e2);
                                    throw e;
                                }
                            }
                            this.currentEventIndex_++;
                            this.deleteEventLog_.garbageCollect(next.getEventId());
                            this.lastSyncedEventId_.set(next.getEventId());
                            this.lastSyncedEventTimeSecs_.set(next.getEventTime());
                            this.metrics_.getTimer(AVG_DELAY_IN_CONSUMING_EVENTS).update((System.currentTimeMillis() / 1000) - next.getEventTime(), TimeUnit.SECONDS);
                        }
                    }
                    break;
                }
                resetProgressInfo();
                long stop2 = time.stop();
                this.lastEventProcessDurationNs_.set(stop2);
                logEventMetrics(hashMap, stop2);
            } catch (CatalogException e3) {
                throw new MetastoreNotificationException(String.format("Unable to process event %d of type %s. Event processing will be stopped.", Long.valueOf(this.currentEvent_.getEventId()), this.currentEvent_.getEventType()), e3);
            }
        } catch (Throwable th6) {
            long stop3 = time.stop();
            this.lastEventProcessDurationNs_.set(stop3);
            logEventMetrics(hashMap, stop3);
            throw th6;
        }
    }

    private void logEventMetrics(Map<MetastoreEvents.MetastoreEvent, Long> map, long j) {
        LOG.info("Time elapsed in processing event batch: {}", PrintUtils.printTimeNs(j));
        if (j < HdfsTable.LOADING_WARNING_TIME_NS) {
            return;
        }
        ArrayList arrayList = new ArrayList(map.entrySet());
        arrayList.sort(Map.Entry.comparingByValue().reversed());
        int min = Math.min(10, arrayList.size());
        StringBuilder sb = new StringBuilder("Top " + min + " expensive events: ");
        for (Map.Entry entry : arrayList.subList(0, min)) {
            MetastoreEvents.MetastoreEvent metastoreEvent = (MetastoreEvents.MetastoreEvent) entry.getKey();
            sb.append(String.format("(type=%s, id=%s, target=%s, duration_ms=%d) ", metastoreEvent.getEventType(), Long.valueOf(metastoreEvent.getEventId()), metastoreEvent.getTargetName(), Long.valueOf(((Long) entry.getValue()).longValue())));
        }
        HashMap hashMap = new HashMap();
        for (MetastoreEvents.MetastoreEvent metastoreEvent2 : map.keySet()) {
            String targetName = metastoreEvent2.getTargetName();
            hashMap.put(targetName, Long.valueOf(((Long) hashMap.getOrDefault(targetName, 0L)).longValue() + map.get(metastoreEvent2).longValue()));
        }
        ArrayList arrayList2 = new ArrayList(hashMap.entrySet());
        arrayList2.sort(Map.Entry.comparingByValue().reversed());
        int min2 = Math.min(10, arrayList2.size());
        sb.append("\nTop ").append(min2).append(" targets in event processing: ");
        for (Map.Entry entry2 : arrayList2.subList(0, min2)) {
            sb.append(String.format("(target=%s, duration_ms=%d) ", (String) entry2.getKey(), Long.valueOf(((Long) entry2.getValue()).longValue())));
        }
        LOG.warn(sb.toString());
    }

    @VisibleForTesting
    public synchronized void updateStatus(EventProcessorStatus eventProcessorStatus) {
        this.eventProcessorStatus_ = eventProcessorStatus;
    }

    private void dumpEventInfoToLog(NotificationEvent notificationEvent) {
        if (notificationEvent == null) {
            LOG.error("Notification event is null");
            this.eventProcessorErrorMsg_ += "\nNotification event is null";
            return;
        }
        StringBuilder append = new StringBuilder().append("Event id: ").append(notificationEvent.getEventId()).append(HiveMetadataFormatUtils.LINE_DELIM).append("Event Type: ").append(notificationEvent.getEventType()).append(HiveMetadataFormatUtils.LINE_DELIM).append("Event time: ").append(notificationEvent.getEventTime()).append(HiveMetadataFormatUtils.LINE_DELIM).append("Database name: ").append(notificationEvent.getDbName()).append(HiveMetadataFormatUtils.LINE_DELIM);
        if (notificationEvent.getTableName() != null) {
            append.append("Table name: ").append(notificationEvent.getTableName()).append(HiveMetadataFormatUtils.LINE_DELIM);
        }
        append.append("Event message: ").append(notificationEvent.getMessage()).append(HiveMetadataFormatUtils.LINE_DELIM);
        String sb = append.toString();
        LOG.error(sb);
        this.eventProcessorErrorMsg_ += '\n' + sb;
    }

    public static synchronized ExternalEventsProcessor getInstance(CatalogOpExecutor catalogOpExecutor, long j, long j2) throws CatalogException {
        if (instance != null) {
            return instance;
        }
        instance = new MetastoreEventsProcessor(catalogOpExecutor, j, j2);
        return instance;
    }

    @Override // org.apache.impala.catalog.events.ExternalEventsProcessor
    public MetastoreEvents.MetastoreEventFactory getEventsFactory() {
        return this.metastoreEventFactory_;
    }

    public static MessageDeserializer getMessageDeserializer() {
        return MESSAGE_DESERIALIZER;
    }
}
