/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.notification;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.EntityCorrelationManager;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.GenericEntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.impexp.AsyncImporter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasMetricsCounter;
import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.LruCache;
import org.apache.atlas.web.security.AtlasAbstractAuthenticationProvider;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.User;
import org.springframework.stereotype.Component;

@Component
@Order(value=5)
@DependsOn(value={"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
public class NotificationHookConsumer
implements Service,
ActiveStateChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
    private static final Logger FAILED_LOG = LoggerFactory.getLogger((String)"FAILED");
    private static final Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger((String)"LARGE_MESSAGES");
    public static final String DUMMY_DATABASE = "_dummy_database";
    public static final String DUMMY_TABLE = "_dummy_table";
    public static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__";
    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
    public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
    public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
    public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
    public static final String CONSUMER_COMMIT_BATCH_SIZE = "atlas.notification.consumer.commit.batch.size";
    public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled";
    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
    public static final String CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.entity.type.ignore.pattern";
    public static final String CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.entity.ignore.pattern";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
    public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled";
    public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
    public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
    public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
    public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
    public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
    public static final String CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES = "atlas.notification.consumer.preprocess.spark_process.attributes";
    public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
    public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds";
    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
    private static final int SC_OK = 200;
    private static final int SC_BAD_REQUEST = 400;
    private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
    private static final String ATTRIBUTE_INPUTS = "inputs";
    private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
    private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
    private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
    private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
    private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME = "atlas-hook-consumer-thread";
    private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME = "atlas-hook-unsorted-consumer-thread";
    private static final String ATLAS_IMPORT_CONSUMER_THREAD_PREFIX = "atlas-import-consumer-thread-";
    private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
    private final AtlasEntityStore atlasEntityStore;
    private final ServiceState serviceState;
    private final AtlasInstanceConverter instanceConverter;
    private final AtlasTypeRegistry typeRegistry;
    private final AtlasMetricsUtil metricsUtil;
    private final int maxRetries;
    private final int failedMsgCacheSize;
    private final int minWaitDuration;
    private final int maxWaitDuration;
    private final int commitBatchSize;
    private final boolean skipHiveColumnLineageHive20633;
    private final int skipHiveColumnLineageHive20633InputsThreshold;
    private final boolean updateHiveProcessNameWithQualifiedName;
    private final int largeMessageProcessingTimeThresholdMs;
    private final boolean consumerDisabled;
    private final List<Pattern> entityTypesToIgnore = new ArrayList<Pattern>();
    private final List<Pattern> entitiesToIgnore = new ArrayList<Pattern>();
    private final List<Pattern> hiveTablesToIgnore = new ArrayList<Pattern>();
    private final List<Pattern> hiveTablesToPrune = new ArrayList<Pattern>();
    private final List<String> hiveDummyDatabasesToIgnore;
    private final List<String> hiveDummyTablesToIgnore;
    private final List<String> hiveTablePrefixesToIgnore;
    private final Map<String, PreprocessorContext.PreprocessAction> hiveTablesCache;
    private final boolean hiveTypesRemoveOwnedRefAttrs;
    private final boolean rdbmsTypesRemoveOwnedRefAttrs;
    private final boolean s3V2DirectoryPruneObjectPrefix;
    private final boolean preprocessEnabled;
    private final boolean createShellEntityForNonExistingReference;
    private final boolean authorizeUsingMessageUser;
    private final boolean sparkProcessAttributes;
    private final Map<String, Authentication> authnCache;
    private final NotificationInterface notificationInterface;
    private final Configuration applicationProperties;
    private final Map<TopicPartition, Long> lastCommittedPartitionOffset;
    private final EntityCorrelationManager entityCorrelationManager;
    private final long consumerMsgBufferingIntervalMS;
    private final int consumerMsgBufferingBatchSize;
    private final AsyncImporter asyncImporter;
    private ExecutorService executors;
    private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime((Instant)Instant.now());
    @VisibleForTesting
    final int consumerRetryInterval;
    @VisibleForTesting
    List<HookConsumer> consumers;

    @Inject
    public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, ServiceState serviceState, AtlasInstanceConverter instanceConverter, AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil, EntityCorrelationStore entityCorrelationStore, @Lazy AsyncImporter asyncImporter) throws AtlasException {
        this.notificationInterface = notificationInterface;
        this.atlasEntityStore = atlasEntityStore;
        this.serviceState = serviceState;
        this.instanceConverter = instanceConverter;
        this.typeRegistry = typeRegistry;
        this.applicationProperties = ApplicationProperties.get();
        this.metricsUtil = metricsUtil;
        this.lastCommittedPartitionOffset = new HashMap<TopicPartition, Long>();
        this.asyncImporter = asyncImporter;
        this.maxRetries = this.applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
        this.failedMsgCacheSize = this.applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
        this.consumerRetryInterval = this.applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
        this.minWaitDuration = this.applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, this.consumerRetryInterval);
        this.maxWaitDuration = this.applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, this.minWaitDuration * 60);
        this.commitBatchSize = this.applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
        this.skipHiveColumnLineageHive20633 = this.applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
        this.skipHiveColumnLineageHive20633InputsThreshold = this.applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15);
        this.updateHiveProcessNameWithQualifiedName = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, true);
        this.consumerDisabled = this.applicationProperties.getBoolean(CONSUMER_DISABLED, false);
        this.largeMessageProcessingTimeThresholdMs = this.applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60000);
        this.createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
        this.authorizeUsingMessageUser = this.applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
        this.consumerMsgBufferingIntervalMS = (long)AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL.getInt() * 1000L;
        this.consumerMsgBufferingBatchSize = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE.getInt();
        int authnCacheTtlSeconds = this.applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
        this.authnCache = this.authorizeUsingMessageUser && authnCacheTtlSeconds > 0 ? new PassiveExpiringMap((long)authnCacheTtlSeconds * 1000L) : null;
        String[] patternEntityTypesToIgnore = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN);
        String[] patternEntitiesToIgnore = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN);
        String[] patternHiveTablesToIgnore = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
        String[] patternHiveTablesToPrune = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
        if (patternEntityTypesToIgnore != null) {
            for (String pattern : patternEntityTypesToIgnore) {
                try {
                    this.entityTypesToIgnore.add(Pattern.compile(pattern));
                }
                catch (Throwable t) {
                    LOG.warn("failed to compile pattern {}", (Object)pattern, (Object)t);
                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", (Object)CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, (Object)pattern);
                }
            }
            LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, this.entityTypesToIgnore);
        }
        if (patternEntitiesToIgnore != null) {
            for (String pattern : patternEntitiesToIgnore) {
                try {
                    this.entitiesToIgnore.add(Pattern.compile(pattern));
                }
                catch (Throwable t) {
                    LOG.warn("failed to compile pattern {}", (Object)pattern, (Object)t);
                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", (Object)CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, (Object)pattern);
                }
            }
            LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, this.entitiesToIgnore);
        }
        if (patternHiveTablesToIgnore != null) {
            for (String pattern : patternHiveTablesToIgnore) {
                try {
                    this.hiveTablesToIgnore.add(Pattern.compile(pattern));
                    LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, (Object)pattern);
                }
                catch (Throwable t) {
                    LOG.warn("failed to compile pattern {}", (Object)pattern, (Object)t);
                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, (Object)pattern);
                }
            }
        }
        if (patternHiveTablesToPrune != null) {
            for (String pattern : patternHiveTablesToPrune) {
                try {
                    this.hiveTablesToPrune.add(Pattern.compile(pattern));
                    LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, (Object)pattern);
                }
                catch (Throwable t) {
                    LOG.warn("failed to compile pattern {}", (Object)pattern, (Object)t);
                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, (Object)pattern);
                }
            }
        }
        this.hiveTablesCache = !this.hiveTablesToIgnore.isEmpty() || !this.hiveTablesToPrune.isEmpty() ? new LruCache(this.applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE, 10000), 0) : Collections.emptyMap();
        boolean hiveDbIgnoreDummyEnabled = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, true);
        boolean hiveTableIgnoreDummyEnabled = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, true);
        boolean hiveTableIgnoreNamePrefixEnabled = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, true);
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, (Object)hiveDbIgnoreDummyEnabled);
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, (Object)hiveTableIgnoreDummyEnabled);
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, (Object)hiveTableIgnoreNamePrefixEnabled);
        if (hiveDbIgnoreDummyEnabled) {
            String[] dummyDatabaseNames = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES);
            this.hiveDummyDatabasesToIgnore = this.trimAndPurge(dummyDatabaseNames, DUMMY_DATABASE);
            LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES, (Object)StringUtils.join(this.hiveDummyDatabasesToIgnore, (char)','));
        } else {
            this.hiveDummyDatabasesToIgnore = Collections.emptyList();
        }
        if (hiveTableIgnoreDummyEnabled) {
            String[] dummyTableNames = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES);
            this.hiveDummyTablesToIgnore = this.trimAndPurge(dummyTableNames, DUMMY_TABLE);
            LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES, (Object)StringUtils.join(this.hiveDummyTablesToIgnore, (char)','));
        } else {
            this.hiveDummyTablesToIgnore = Collections.emptyList();
        }
        if (hiveTableIgnoreNamePrefixEnabled) {
            String[] ignoreNamePrefixes = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES);
            this.hiveTablePrefixesToIgnore = this.trimAndPurge(ignoreNamePrefixes, VALUES_TMP_TABLE_NAME_PREFIX);
            LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES, (Object)StringUtils.join(this.hiveTablePrefixesToIgnore, (char)','));
        } else {
            this.hiveTablePrefixesToIgnore = Collections.emptyList();
        }
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, (Object)this.updateHiveProcessNameWithQualifiedName);
        this.hiveTypesRemoveOwnedRefAttrs = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
        this.rdbmsTypesRemoveOwnedRefAttrs = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
        this.s3V2DirectoryPruneObjectPrefix = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true);
        this.sparkProcessAttributes = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES, false);
        this.preprocessEnabled = this.skipHiveColumnLineageHive20633 || this.updateHiveProcessNameWithQualifiedName || this.hiveTypesRemoveOwnedRefAttrs || this.rdbmsTypesRemoveOwnedRefAttrs || this.s3V2DirectoryPruneObjectPrefix || !this.hiveTablesToIgnore.isEmpty() || !this.hiveTablesToPrune.isEmpty() || !this.hiveDummyDatabasesToIgnore.isEmpty() || !this.hiveDummyTablesToIgnore.isEmpty() || !this.hiveTablePrefixesToIgnore.isEmpty() || this.sparkProcessAttributes;
        this.entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore);
        LOG.info("{}={}", (Object)CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, (Object)this.skipHiveColumnLineageHive20633);
        LOG.info("{}={}", (Object)CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, (Object)this.skipHiveColumnLineageHive20633InputsThreshold);
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, (Object)this.hiveTypesRemoveOwnedRefAttrs);
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, (Object)this.rdbmsTypesRemoveOwnedRefAttrs);
        LOG.info("{}={}", (Object)CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, (Object)this.s3V2DirectoryPruneObjectPrefix);
        LOG.info("{}={}", (Object)CONSUMER_COMMIT_BATCH_SIZE, (Object)this.commitBatchSize);
        LOG.info("{}={}", (Object)CONSUMER_DISABLED, (Object)this.consumerDisabled);
    }

    public void start() throws AtlasException {
        this.startInternal(this.applicationProperties, null);
    }

    public void stop() {
        try {
            if (this.consumerDisabled && this.consumers.isEmpty()) {
                return;
            }
            this.stopConsumerThreads();
            if (this.executors != null) {
                this.executors.shutdown();
                if (!this.executors.awaitTermination(30000L, TimeUnit.MILLISECONDS)) {
                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
                this.executors = null;
            }
            this.notificationInterface.close();
        }
        catch (InterruptedException e) {
            LOG.error("Failure in shutting down consumers");
        }
    }

    public void instanceIsActive() {
        if (this.executors == null) {
            this.executors = this.createExecutor();
            LOG.info("Executors initialized (Instance is active)");
        }
        if (this.consumerDisabled) {
            return;
        }
        LOG.info("Reacting to active state: initializing Kafka consumers");
        this.startHookConsumers();
    }

    public void instanceIsPassive() {
        if (this.consumerDisabled && this.consumers.isEmpty()) {
            return;
        }
        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
        this.stop();
    }

    public int getHandlerOrder() {
        return ActiveStateChangeHandler.HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeImportConsumer(String importId, String topic) {
        try {
            LOG.info("==> closeImportConsumer(importId={}, topic={})", (Object)importId, (Object)topic);
            String consumerName = ATLAS_IMPORT_CONSUMER_THREAD_PREFIX + importId;
            ListIterator<HookConsumer> consumersIterator = this.consumers.listIterator();
            while (consumersIterator.hasNext()) {
                HookConsumer consumer = consumersIterator.next();
                if (!consumer.getName().startsWith(consumerName)) continue;
                consumer.shutdown();
                consumersIterator.remove();
            }
            this.notificationInterface.closeConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, topic);
            this.notificationInterface.deleteTopic(NotificationInterface.NotificationType.ASYNC_IMPORT, topic);
            this.lastCommittedPartitionOffset.entrySet().removeIf(entry -> topic.equals(((TopicPartition)entry.getKey()).topic()));
        }
        catch (Exception e) {
            LOG.error("Could not cleanup consumers for importId: {}", (Object)importId, (Object)e);
        }
        finally {
            LOG.info("<== closeImportConsumer(importId={}, topic={})", (Object)importId, (Object)topic);
        }
    }

    @VisibleForTesting
    void startInternal(Configuration configuration, ExecutorService executorService) {
        if (this.consumers == null) {
            this.consumers = new ArrayList<HookConsumer>();
        }
        if (executorService != null) {
            this.executors = executorService;
        }
        if (!HAConfiguration.isHAEnabled((Configuration)configuration)) {
            if (this.executors == null) {
                this.executors = this.createExecutor();
                LOG.info("Executors initialized (HA is disabled)");
            }
            if (this.consumerDisabled) {
                LOG.info("No hook messages will be processed. {} = {}", (Object)CONSUMER_DISABLED, (Object)this.consumerDisabled);
                return;
            }
            LOG.info("HA is disabled, starting consumers inline.");
            this.startHookConsumers();
        }
    }

    @VisibleForTesting
    void startHookConsumers() {
        int numThreads = this.applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
        HashMap<NotificationConsumer, NotificationInterface.NotificationType> notificationConsumersByType = new HashMap<NotificationConsumer, NotificationInterface.NotificationType>();
        List notificationConsumers = this.notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
        for (NotificationConsumer notificationConsumer : notificationConsumers) {
            notificationConsumersByType.put(notificationConsumer, NotificationInterface.NotificationType.HOOK);
        }
        if (AtlasHook.isHookMsgsSortEnabled) {
            List unsortedNotificationConsumers = this.notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK_UNSORTED, numThreads);
            for (NotificationConsumer unsortedNotificationConsumer : unsortedNotificationConsumers) {
                notificationConsumersByType.put(unsortedNotificationConsumer, NotificationInterface.NotificationType.HOOK_UNSORTED);
            }
        }
        ArrayList<HookConsumer> hookConsumers = new ArrayList<HookConsumer>();
        for (NotificationConsumer consumer : notificationConsumersByType.keySet()) {
            String hookConsumerName = ATLAS_HOOK_CONSUMER_THREAD_NAME;
            if (((NotificationInterface.NotificationType)notificationConsumersByType.get(consumer)).equals((Object)NotificationInterface.NotificationType.HOOK_UNSORTED)) {
                hookConsumerName = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME;
            }
            HookConsumer hookConsumer = new HookConsumer(hookConsumerName, (NotificationConsumer<HookNotification>)consumer);
            hookConsumers.add(hookConsumer);
        }
        this.startConsumers(hookConsumers);
    }

    public void startAsyncImportConsumer(NotificationInterface.NotificationType notificationType, String importId, String topic) throws AtlasBaseException {
        if (topic != null) {
            this.notificationInterface.addTopicToNotificationType(notificationType, topic);
        }
        List notificationConsumers = this.notificationInterface.createConsumers(notificationType, 1);
        ArrayList<HookConsumer> hookConsumers = new ArrayList<HookConsumer>();
        for (NotificationConsumer consumer : notificationConsumers) {
            String hookConsumerName = ATLAS_IMPORT_CONSUMER_THREAD_PREFIX + importId;
            HookConsumer hookConsumer = new HookConsumer(hookConsumerName, (NotificationConsumer<HookNotification>)consumer);
            hookConsumers.add(hookConsumer);
        }
        this.startConsumers(hookConsumers);
    }

    @VisibleForTesting
    protected ExecutorService createExecutor() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
    }

    private void startConsumers(List<HookConsumer> hookConsumers) {
        if (this.consumers == null) {
            this.consumers = new ArrayList<HookConsumer>();
        }
        if (this.executors == null) {
            throw new IllegalStateException("Executors must be initialized before starting consumers.");
        }
        for (HookConsumer consumer : hookConsumers) {
            this.consumers.add(consumer);
            this.executors.submit(consumer);
        }
    }

    private void stopConsumerThreads() {
        LOG.info("==> stopConsumerThreads()");
        if (this.consumers != null) {
            Iterator<HookConsumer> iterator = this.consumers.iterator();
            while (iterator.hasNext()) {
                HookConsumer consumer = iterator.next();
                consumer.shutdown();
                iterator.remove();
            }
            this.consumers.clear();
        }
        LOG.info("<== stopConsumerThreads()");
    }

    private List<String> trimAndPurge(String[] values, String defaultValue) {
        List<String> ret;
        if (values != null && values.length > 0) {
            ret = new ArrayList<String>(values.length);
            for (String val : values) {
                if (!StringUtils.isNotBlank((String)val)) continue;
                ret.add(val.trim());
            }
        } else {
            ret = StringUtils.isNotBlank((String)defaultValue) ? Collections.singletonList(defaultValue.trim()) : Collections.emptyList();
        }
        return ret;
    }

    private void preprocessEntities(PreprocessorContext context) {
        Map<String, AtlasEntity> referredEntities;
        GenericEntityPreprocessor genericEntityPreprocessor = new GenericEntityPreprocessor(this.entityTypesToIgnore, this.entitiesToIgnore);
        List<AtlasEntity> entities = context.getEntities();
        if (entities != null) {
            for (int i = 0; i < entities.size(); ++i) {
                AtlasEntity entity = entities.get(i);
                genericEntityPreprocessor.preprocess(entity, context);
                if (!context.isIgnoredEntity(entity.getGuid())) continue;
                entities.remove(i--);
            }
        }
        if ((referredEntities = context.getReferredEntities()) != null) {
            Iterator<Map.Entry<String, AtlasEntity>> iterator = referredEntities.entrySet().iterator();
            while (iterator.hasNext()) {
                AtlasEntity entity = iterator.next().getValue();
                genericEntityPreprocessor.preprocess(entity, context);
                if (!context.isIgnoredEntity(entity.getGuid())) continue;
                iterator.remove();
            }
        }
    }

    private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
        PreprocessorContext context = null;
        if (this.preprocessEnabled) {
            context = new PreprocessorContext(kafkaMsg, this.typeRegistry, this.hiveTablesToIgnore, this.hiveTablesToPrune, this.hiveTablesCache, this.hiveDummyDatabasesToIgnore, this.hiveDummyTablesToIgnore, this.hiveTablePrefixesToIgnore, this.hiveTypesRemoveOwnedRefAttrs, this.rdbmsTypesRemoveOwnedRefAttrs, this.s3V2DirectoryPruneObjectPrefix, this.updateHiveProcessNameWithQualifiedName, this.entityCorrelationManager);
            if (CollectionUtils.isNotEmpty(this.entityTypesToIgnore) || CollectionUtils.isNotEmpty(this.entitiesToIgnore)) {
                this.preprocessEntities(context);
            }
            if (context.isHivePreprocessEnabled()) {
                this.preprocessHiveTypes(context);
            }
            if (this.skipHiveColumnLineageHive20633) {
                this.skipHiveColumnLineage(context);
            }
            if (this.rdbmsTypesRemoveOwnedRefAttrs) {
                this.rdbmsTypeRemoveOwnedRefAttrs(context);
            }
            if (this.s3V2DirectoryPruneObjectPrefix) {
                this.pruneObjectPrefixForS3V2Directory(context);
            }
            if (this.sparkProcessAttributes) {
                this.preprocessSparkProcessAttributes(context);
            }
            context.moveRegisteredReferredEntities();
            if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
                List<AtlasEntity> entities = context.getEntities();
                int count = entities.size();
                for (int i = 0; i < count; ++i) {
                    AtlasEntity entity = entities.get(i);
                    switch (entity.getTypeName()) {
                        case "hive_process": 
                        case "hive_column_lineage": {
                            entities.remove(i--);
                            entities.add(entity);
                            --count;
                        }
                    }
                }
                if (entities.size() - count > 0) {
                    LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic-offset={}, partition={}", new Object[]{entities.size() - count, entities.size(), kafkaMsg.getOffset(), kafkaMsg.getPartition()});
                }
            }
        }
        return context;
    }

    private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
        List<AtlasEntity> entities = context.getEntities();
        if (entities != null) {
            for (int i = 0; i < entities.size(); ++i) {
                AtlasEntity entity = entities.get(i);
                EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
                if (preprocessor == null) continue;
                preprocessor.preprocess(entity, context);
            }
        }
    }

    private void pruneObjectPrefixForS3V2Directory(PreprocessorContext context) {
        ArrayList<AtlasEntity> entities = new ArrayList<AtlasEntity>();
        if (CollectionUtils.isNotEmpty(context.getEntities())) {
            entities.addAll(context.getEntities());
        }
        if (MapUtils.isNotEmpty(context.getReferredEntities())) {
            entities.addAll(context.getReferredEntities().values());
        }
        if (CollectionUtils.isNotEmpty(entities)) {
            for (AtlasEntity entity : entities) {
                EntityPreprocessor preprocessor = EntityPreprocessor.getS3V2Preprocessor(entity.getTypeName());
                if (preprocessor == null) continue;
                preprocessor.preprocess(entity, context);
            }
        }
    }

    private void preprocessHiveTypes(PreprocessorContext context) {
        List<AtlasEntity> entities = context.getEntities();
        if (entities != null) {
            for (int i = 0; i < entities.size(); ++i) {
                AtlasEntity entity = entities.get(i);
                EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
                if (preprocessor == null) continue;
                preprocessor.preprocess(entity, context);
                if (!context.isIgnoredEntity(entity.getGuid())) continue;
                entities.remove(i--);
            }
            Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
            if (referredEntities != null) {
                Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator();
                while (iter.hasNext()) {
                    AtlasEntity entity = iter.next().getValue();
                    EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
                    if (preprocessor == null) continue;
                    preprocessor.preprocess(entity, context);
                    if (!context.isIgnoredEntity(entity.getGuid())) continue;
                    iter.remove();
                }
            }
            int ignoredEntities = context.getIgnoredEntities().size();
            int prunedEntities = context.getPrunedEntities().size();
            if (ignoredEntities > 0 || prunedEntities > 0) {
                LOG.info("preprocess: ignored entities={}; pruned entities={}. topic-offset={}, partition={}", new Object[]{ignoredEntities, prunedEntities, context.getKafkaMessageOffset(), context.getKafkaPartition()});
            }
        }
    }

    private void preprocessSparkProcessAttributes(PreprocessorContext context) {
        List<AtlasEntity> entities = context.getEntities();
        if (entities != null) {
            for (int i = 0; i < entities.size(); ++i) {
                AtlasEntity entity = entities.get(i);
                EntityPreprocessor preprocessor = EntityPreprocessor.getSparkPreprocessor(entity.getTypeName());
                if (preprocessor == null) continue;
                preprocessor.preprocess(entity, context);
            }
        }
    }

    private void skipHiveColumnLineage(PreprocessorContext context) {
        List<AtlasEntity> entities = context.getEntities();
        if (entities != null) {
            float avgInputsCount;
            int lineageCount = 0;
            int lineageInputsCount = 0;
            int numRemovedEntities = 0;
            HashSet<String> lineageQNames = new HashSet<String>();
            for (int i = 0; i < entities.size(); ++i) {
                AtlasEntity entity = entities.get(i);
                if (!StringUtils.equals((String)entity.getTypeName(), (String)TYPE_HIVE_COLUMN_LINEAGE)) continue;
                Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
                if (qName != null) {
                    String qualifiedName = qName.toString();
                    if (lineageQNames.contains(qualifiedName)) {
                        entities.remove(i--);
                        LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", new Object[]{qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition()});
                        ++numRemovedEntities;
                        continue;
                    }
                    lineageQNames.add(qualifiedName);
                }
                ++lineageCount;
                Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
                if (!(objInputs instanceof Collection)) continue;
                Collection inputs = (Collection)objInputs;
                lineageInputsCount += inputs.size();
            }
            float f = avgInputsCount = lineageCount > 0 ? (float)lineageInputsCount / (float)lineageCount : 0.0f;
            if (avgInputsCount > (float)this.skipHiveColumnLineageHive20633InputsThreshold) {
                for (int i = 0; i < entities.size(); ++i) {
                    AtlasEntity entity = entities.get(i);
                    if (!StringUtils.equals((String)entity.getTypeName(), (String)TYPE_HIVE_COLUMN_LINEAGE)) continue;
                    entities.remove(i--);
                    ++numRemovedEntities;
                }
            }
            if (numRemovedEntities > 0) {
                LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", new Object[]{numRemovedEntities, Float.valueOf(avgInputsCount), this.skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition()});
            }
        }
    }

    private boolean isEmptyMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
        boolean ret;
        HookNotification message = (HookNotification)kafkaMsg.getMessage();
        switch (message.getType()) {
            case ENTITY_CREATE_V2: {
                AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityCreateRequestV2)message).getEntities();
                ret = entities == null || CollectionUtils.isEmpty((Collection)entities.getEntities());
                break;
            }
            case ENTITY_FULL_UPDATE_V2: {
                AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityUpdateRequestV2)message).getEntities();
                ret = entities == null || CollectionUtils.isEmpty((Collection)entities.getEntities());
                break;
            }
            default: {
                ret = false;
            }
        }
        return ret;
    }

    private void recordProcessedEntities(EntityMutationResponse mutationResponse, AtlasMetricsUtil.NotificationStat stats, PreprocessorContext context) {
        if (mutationResponse != null) {
            if (stats != null) {
                stats.updateStats(mutationResponse);
            }
            if (context != null) {
                if (MapUtils.isNotEmpty((Map)mutationResponse.getGuidAssignments())) {
                    context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
                }
                if (CollectionUtils.isNotEmpty((Collection)mutationResponse.getCreatedEntities())) {
                    for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
                        if (entity == null || entity.getGuid() == null) continue;
                        context.getCreatedEntities().add(entity.getGuid());
                    }
                }
                if (CollectionUtils.isNotEmpty((Collection)mutationResponse.getDeletedEntities())) {
                    for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
                        if (entity == null || entity.getGuid() == null) continue;
                        context.getDeletedEntities().add(entity.getGuid());
                    }
                }
            }
        }
    }

    private void updateProcessedEntityReferences(List<AtlasEntity> entities, Map<String, String> guidAssignments) {
        if (CollectionUtils.isNotEmpty(entities) && MapUtils.isNotEmpty(guidAssignments)) {
            for (AtlasEntity entity : entities) {
                AtlasEntityType entityType = this.typeRegistry.getEntityTypeByName(entity.getTypeName());
                if (entityType == null) continue;
                if (MapUtils.isNotEmpty((Map)entity.getAttributes())) {
                    for (Map.Entry entry : entity.getAttributes().entrySet()) {
                        String attrName = (String)entry.getKey();
                        Object attrValue = entry.getValue();
                        if (attrValue == null) continue;
                        AtlasStructType.AtlasAttribute attribute = entityType.getAttribute(attrName);
                        if (attribute == null) {
                            attribute = entityType.getRelationshipAttribute(attrName, null);
                        }
                        if (attribute == null || !attribute.isObjectRef()) continue;
                        this.updateProcessedEntityReferences(attrValue, guidAssignments);
                    }
                }
                if (!MapUtils.isNotEmpty((Map)entity.getRelationshipAttributes())) continue;
                for (Map.Entry entry : entity.getRelationshipAttributes().entrySet()) {
                    Object attrValue = entry.getValue();
                    if (attrValue == null) continue;
                    this.updateProcessedEntityReferences(attrValue, guidAssignments);
                }
            }
        }
    }

    private void updateProcessedEntityReferences(Object objVal, Map<String, String> guidAssignments) {
        if (objVal instanceof AtlasObjectId) {
            this.updateProcessedEntityReferences((AtlasObjectId)objVal, guidAssignments);
        } else if (objVal instanceof Collection) {
            this.updateProcessedEntityReferences((Collection)objVal, guidAssignments);
        } else if (objVal instanceof Map) {
            this.updateProcessedEntityReferences((Map)objVal, guidAssignments);
        }
    }

    private void updateProcessedEntityReferences(AtlasObjectId objId, Map<String, String> guidAssignments) {
        String guid = objId.getGuid();
        if (guid != null && guidAssignments.containsKey(guid)) {
            String assignedGuid = guidAssignments.get(guid);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", new Object[]{objId.getTypeName(), guid, assignedGuid});
            }
            objId.setGuid(assignedGuid);
            objId.setTypeName(null);
            objId.setUniqueAttributes(null);
        }
    }

    private void updateProcessedEntityReferences(Map objId, Map<String, String> guidAssignments) {
        Object guid = objId.get("guid");
        if (guid != null && guidAssignments.containsKey(guid)) {
            String assignedGuid = guidAssignments.get(guid);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", new Object[]{objId.get("typeName"), guid, assignedGuid});
            }
            objId.put("guid", assignedGuid);
            objId.remove("typeName");
            objId.remove("uniqueAttributes");
        }
    }

    private void updateProcessedEntityReferences(Collection<?> objIds, Map<String, String> guidAssignments) {
        for (Object objId : objIds) {
            this.updateProcessedEntityReferences(objId, guidAssignments);
        }
    }

    private void setCurrentUser(String userName) {
        Authentication authentication = this.getAuthenticationForUser(userName);
        if (LOG.isDebugEnabled()) {
            if (authentication != null) {
                LOG.debug("setCurrentUser(): notification processing will be authorized as user '{}'", (Object)userName);
            } else {
                LOG.debug("setCurrentUser(): Failed to get authentication for user '{}'.", (Object)userName);
            }
        }
        SecurityContextHolder.getContext().setAuthentication(authentication);
    }

    private Authentication getAuthenticationForUser(String userName) {
        Authentication ret = null;
        if (StringUtils.isNotBlank((String)userName)) {
            Authentication authentication = ret = this.authnCache != null ? this.authnCache.get(userName) : null;
            if (ret == null) {
                List<GrantedAuthority> grantedAuths = AtlasAbstractAuthenticationProvider.getAuthoritiesFromUGI(userName);
                User principal = new User(userName, "", grantedAuths);
                ret = new UsernamePasswordAuthenticationToken((Object)principal, (Object)"");
                if (this.authnCache != null) {
                    this.authnCache.put(userName, ret);
                }
            }
        }
        return ret;
    }

    static /* synthetic */ boolean access$700(NotificationHookConsumer x0) {
        return x0.authorizeUsingMessageUser;
    }

    static /* synthetic */ void access$800(NotificationHookConsumer x0, String x1) {
        x0.setCurrentUser(x1);
    }

    static /* synthetic */ Logger access$900() {
        return PERF_LOG;
    }

    static /* synthetic */ AtlasInstanceConverter access$1000(NotificationHookConsumer x0) {
        return x0.instanceConverter;
    }

    static /* synthetic */ PreprocessorContext access$1100(NotificationHookConsumer x0, AtlasKafkaMessage x1) {
        return x0.preProcessNotificationMessage((AtlasKafkaMessage<HookNotification>)x1);
    }

    static /* synthetic */ boolean access$1200(NotificationHookConsumer x0, AtlasKafkaMessage x1) {
        return x0.isEmptyMessage((AtlasKafkaMessage<HookNotification>)x1);
    }

    static /* synthetic */ int access$1300(NotificationHookConsumer x0) {
        return x0.maxRetries;
    }

    static /* synthetic */ boolean access$1400(NotificationHookConsumer x0) {
        return x0.createShellEntityForNonExistingReference;
    }

    static /* synthetic */ String access$1500() {
        return THREADNAME_PREFIX;
    }

    static /* synthetic */ AtlasTypeRegistry access$1600(NotificationHookConsumer x0) {
        return x0.typeRegistry;
    }

    static /* synthetic */ AsyncImporter access$1900(NotificationHookConsumer x0) {
        return x0.asyncImporter;
    }

    static /* synthetic */ int access$2000(NotificationHookConsumer x0) {
        return x0.failedMsgCacheSize;
    }

    static /* synthetic */ AtlasMetricsUtil access$2100(NotificationHookConsumer x0) {
        return x0.metricsUtil;
    }

    static /* synthetic */ int access$2200(NotificationHookConsumer x0) {
        return x0.largeMessageProcessingTimeThresholdMs;
    }

    static /* synthetic */ Logger access$2300() {
        return LARGE_MESSAGES_LOG;
    }

    static /* synthetic */ Instant access$2400(NotificationHookConsumer x0) {
        return x0.nextStatsLogTime;
    }

    static /* synthetic */ Instant access$2402(NotificationHookConsumer x0, Instant x1) {
        x0.nextStatsLogTime = x1;
        return x0.nextStatsLogTime;
    }

    static /* synthetic */ ServiceState access$2500(NotificationHookConsumer x0) {
        return x0.serviceState;
    }

    @VisibleForTesting
    class HookConsumer
    extends Thread {
        private final NotificationConsumer<HookNotification> consumer;
        private final AtomicBoolean shouldRun;
        private final List<String> failedMessages;
        private final AdaptiveWaiter adaptiveWaiter;
        private int duplicateKeyCounter;

        public HookConsumer(NotificationConsumer<HookNotification> consumer) {
            super(NotificationHookConsumer.ATLAS_HOOK_CONSUMER_THREAD_NAME);
            this.shouldRun = new AtomicBoolean(false);
            this.failedMessages = new ArrayList<String>();
            this.adaptiveWaiter = new AdaptiveWaiter(NotificationHookConsumer.this.minWaitDuration, NotificationHookConsumer.this.maxWaitDuration, NotificationHookConsumer.this.minWaitDuration);
            this.duplicateKeyCounter = 1;
            this.consumer = consumer;
        }

        public HookConsumer(String consumerThreadName, NotificationConsumer<HookNotification> consumer) {
            super(consumerThreadName);
            this.shouldRun = new AtomicBoolean(false);
            this.failedMessages = new ArrayList<String>();
            this.adaptiveWaiter = new AdaptiveWaiter(NotificationHookConsumer.this.minWaitDuration, NotificationHookConsumer.this.maxWaitDuration, NotificationHookConsumer.this.minWaitDuration);
            this.duplicateKeyCounter = 1;
            this.consumer = consumer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            LOG.info("==> HookConsumer run()");
            this.shouldRun.set(true);
            if (!this.serverAvailable(new Timer())) {
                return;
            }
            try {
                while (this.shouldRun.get()) {
                    try {
                        if (StringUtils.equals((String)NotificationHookConsumer.ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME, (String)this.getName())) {
                            long msgBufferingStartTime = System.currentTimeMillis();
                            TreeMap<String, AtlasKafkaMessage<HookNotification>> msgBuffer = new TreeMap<String, AtlasKafkaMessage<HookNotification>>();
                            this.sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer);
                            continue;
                        }
                        List messages = this.consumer.receiveWithCheckedCommit(NotificationHookConsumer.this.lastCommittedPartitionOffset);
                        for (AtlasKafkaMessage msg : messages) {
                            this.handleMessage((AtlasKafkaMessage<HookNotification>)msg);
                        }
                    }
                    catch (IllegalStateException ex) {
                        this.adaptiveWaiter.pause(ex);
                    }
                    catch (Exception e) {
                        if (!this.shouldRun.get()) return;
                        LOG.warn("Exception in NotificationHookConsumer", (Throwable)e);
                        this.adaptiveWaiter.pause(e);
                    }
                }
                return;
            }
            finally {
                if (this.consumer != null) {
                    LOG.info("closing NotificationConsumer");
                    this.consumer.close();
                }
                LOG.info("<== HookConsumer run()");
            }
        }

        public void shutdown() {
            LOG.info("==> HookConsumer shutdown()");
            if (!this.shouldRun.compareAndSet(true, false)) {
                return;
            }
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
            LOG.info("<== HookConsumer shutdown()");
        }

        void sortAndPublishMsgsToAtlasHook(long msgBufferingStartTime, Map<String, AtlasKafkaMessage<HookNotification>> msgBuffer) throws NotificationException {
            List messages = this.consumer.receiveRawRecordsWithCheckedCommit(NotificationHookConsumer.this.lastCommittedPartitionOffset);
            AtlasKafkaMessage<HookNotification> maxOffsetMsg = null;
            long maxOffsetProcessed = 0L;
            messages.forEach(x -> this.sortMessages((AtlasKafkaMessage<HookNotification>)x, msgBuffer));
            if (msgBuffer.size() < NotificationHookConsumer.this.consumerMsgBufferingBatchSize && System.currentTimeMillis() - msgBufferingStartTime < NotificationHookConsumer.this.consumerMsgBufferingIntervalMS) {
                this.sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer);
                return;
            }
            for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) {
                String hookTopic;
                String string = hookTopic = StringUtils.isNotEmpty((String)msg.getTopic()) ? msg.getTopic().split("_UNSORTED")[0] : KafkaNotification.ATLAS_HOOK_TOPIC;
                if (maxOffsetProcessed == 0L || maxOffsetProcessed < msg.getOffset()) {
                    maxOffsetMsg = msg;
                    maxOffsetProcessed = msg.getOffset();
                }
                ((KafkaNotification)NotificationHookConsumer.this.notificationInterface).sendInternal(hookTopic, StringUtils.isNotEmpty((String)msg.getRawRecordData()) ? Collections.singletonList(msg.getRawRecordData()) : Collections.singletonList(((HookNotification)msg.getMessage()).toString()));
            }
            for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) {
                this.commit(msg);
            }
            if (maxOffsetMsg != null) {
                this.commit(maxOffsetMsg);
            }
            msgBuffer.clear();
            this.resetDuplicateKeyCounter();
        }

        /*
         * Exception decompiling
         */
        @VisibleForTesting
        void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [48[FORLOOP], 39[CATCHBLOCK]], but top level block is 12[TRYBLOCK]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        /*
         * Exception decompiling
         */
        boolean serverAvailable(Timer timer) {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        private void resetDuplicateKeyCounter() {
            this.duplicateKeyCounter = 1;
        }

        private String getKey(String msgCreated, String source) {
            return String.format("%s_%s", msgCreated, source);
        }

        private void sortMessages(AtlasKafkaMessage<HookNotification> msg, Map<String, AtlasKafkaMessage<HookNotification>> msgBuffer) {
            String key = this.getKey(Long.toString(msg.getMsgCreated()), msg.getSource());
            if (msgBuffer.containsKey(key)) {
                key = this.getKey(key, Integer.toString(this.duplicateKeyCounter));
                ++this.duplicateKeyCounter;
            }
            msgBuffer.put(key, msg);
        }

        private void createOrUpdate(AtlasEntity.AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, AtlasMetricsUtil.NotificationStat stats, PreprocessorContext context) throws AtlasBaseException {
            List entitiesList = entities.getEntities();
            AtlasEntityStream entityStream = new AtlasEntityStream(entities);
            if (NotificationHookConsumer.this.commitBatchSize <= 0 || entitiesList.size() <= NotificationHookConsumer.this.commitBatchSize) {
                EntityMutationResponse response = NotificationHookConsumer.this.atlasEntityStore.createOrUpdate((EntityStream)entityStream, isPartialUpdate);
                NotificationHookConsumer.this.recordProcessedEntities(response, stats, context);
            } else {
                for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += NotificationHookConsumer.this.commitBatchSize) {
                    int toIndex = fromIdx + NotificationHookConsumer.this.commitBatchSize;
                    if (toIndex > entitiesList.size()) {
                        toIndex = entitiesList.size();
                    }
                    ArrayList entitiesBatch = new ArrayList(entitiesList.subList(fromIdx, toIndex));
                    NotificationHookConsumer.this.updateProcessedEntityReferences(entitiesBatch, (Map<String, String>)context.getGuidAssignments());
                    AtlasEntity.AtlasEntitiesWithExtInfo batch = new AtlasEntity.AtlasEntitiesWithExtInfo(entitiesBatch);
                    AtlasEntityStream batchStream = new AtlasEntityStream(batch, (EntityStream)entityStream);
                    EntityMutationResponse response = NotificationHookConsumer.this.atlasEntityStore.createOrUpdate((EntityStream)batchStream, isPartialUpdate);
                    NotificationHookConsumer.this.recordProcessedEntities(response, stats, context);
                    RequestContext.get().resetEntityGuidUpdates();
                    NotificationHookConsumer.this.entityCorrelationManager.add(context.isSpooledMessage(), context.getMsgCreated(), response.getDeletedEntities());
                    RequestContext.get().clearCache();
                }
            }
            if (context != null) {
                context.prepareForPostUpdate();
                List<AtlasEntity> postUpdateEntities = context.getPostUpdateEntities();
                if (CollectionUtils.isNotEmpty(postUpdateEntities)) {
                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate((EntityStream)new AtlasEntityStream(postUpdateEntities), true);
                }
            }
        }

        private void recordFailedMessages() {
            for (String message : this.failedMessages) {
                FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", (Object)message);
            }
            this.failedMessages.clear();
        }

        private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
            this.recordFailedMessages();
            long commitOffset = kafkaMessage.getOffset() + 1L;
            NotificationHookConsumer.this.lastCommittedPartitionOffset.put(kafkaMessage.getTopicPartition(), commitOffset);
            this.consumer.commit(kafkaMessage.getTopicPartition(), commitOffset);
        }
    }

    static class AdaptiveWaiter {
        private final long increment;
        private final long maxDuration;
        private final long minDuration;
        private final long resetInterval;
        @VisibleForTesting
        long waitDuration;
        private long lastWaitAt;

        public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
            this.minDuration = minDuration;
            this.maxDuration = maxDuration;
            this.increment = increment;
            this.waitDuration = minDuration;
            this.lastWaitAt = 0L;
            this.resetInterval = maxDuration * 2L;
        }

        public void pause(Throwable ex) {
            block3: {
                this.setWaitDurations();
                try {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} in NotificationHookConsumer. Waiting for {} ms for recovery.", new Object[]{ex.getClass().getName(), this.waitDuration, ex});
                    }
                    Thread.sleep(this.waitDuration);
                }
                catch (InterruptedException e) {
                    if (!LOG.isDebugEnabled()) break block3;
                    LOG.debug("{} in NotificationHookConsumer. Waiting for recovery interrupted.", (Object)ex.getClass().getName(), (Object)e);
                }
            }
        }

        private void setWaitDurations() {
            long timeSinceLastWait = this.lastWaitAt == 0L ? 0L : System.currentTimeMillis() - this.lastWaitAt;
            this.lastWaitAt = System.currentTimeMillis();
            if (timeSinceLastWait > this.resetInterval) {
                this.waitDuration = this.minDuration;
            } else {
                this.waitDuration += this.increment;
                if (this.waitDuration > this.maxDuration) {
                    this.waitDuration = this.maxDuration;
                }
            }
        }
    }

    static class Timer {
        Timer() {
        }

        public void sleep(int interval) throws InterruptedException {
            Thread.sleep(interval);
        }
    }
}

