/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.metastore.metrics;

import com.google.common.annotations.VisibleForTesting;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.metrics.MetricsMBeanImpl;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.CompactionMetricsDataRequest;
import org.apache.hadoop.hive.metastore.api.CompactionMetricsDataStruct;
import org.apache.hadoop.hive.metastore.api.CompactionMetricsMetricType;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.CompactionMetricData;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
import org.apache.hadoop.hive.metastore.txn.MetricsInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AcidMetricService
implements MetastoreTaskThread {
    private static final Logger LOG = LoggerFactory.getLogger(AcidMetricService.class);
    public static final String OBJECT_NAME_PREFIX = "metrics:type=compaction,name=";
    private static boolean metricsEnabled;
    private MetricsMBeanImpl deltaObject;
    private MetricsMBeanImpl smallDeltaObject;
    private MetricsMBeanImpl obsoleteDeltaObject;
    private Configuration conf;
    private TxnStore txnHandler;
    private int maxCacheSize;

    @Override
    public long runFrequency(TimeUnit unit) {
        return MetastoreConf.getTimeVar(this.conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_CHECK_INTERVAL, unit);
    }

    @Override
    public void run() {
        LOG.debug("Starting AcidMetricService thread");
        try {
            if (!metricsEnabled) {
                return;
            }
            long startedAt = System.currentTimeMillis();
            try {
                this.updateMetrics();
                this.updateDeltaMetrics();
            }
            catch (Exception ex) {
                LOG.error("Caught exception in AcidMetricService loop", (Throwable)ex);
            }
            long elapsedTime = System.currentTimeMillis() - startedAt;
            LOG.debug("AcidMetricService thread finished one loop in {} seconds.", (Object)(elapsedTime / 1000L));
        }
        catch (Throwable t) {
            LOG.error("Caught an exception in the main loop of AcidMetricService, exiting ", t);
        }
    }

    public static void updateMetricsFromInitiator(String dbName, String tableName, String partitionName, Configuration conf, TxnStore txnHandler, long baseSize, Map<Path, Long> activeDeltaSizes, List<Path> obsoleteDeltaPaths) {
        if (!metricsEnabled) {
            LOG.debug("Acid metric collection is not enabled. To turn it on, \"metastore.acidmetrics.thread.on\" and \"metastore.metrics.enabled\" must be set to true and HMS restarted.");
            return;
        }
        LOG.debug("Updating delta file metrics from initiator");
        double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
        int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
        int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
        try {
            int numDeltas = activeDeltaSizes.size();
            int numSmallDeltas = 0;
            for (long deltaSize : activeDeltaSizes.values()) {
                if (baseSize == 0L || !((double)((float)deltaSize / (float)baseSize) < deltaPctThreshold)) continue;
                ++numSmallDeltas;
            }
            int numObsoleteDeltas = AcidMetricService.filterOutBaseAndOriginalFiles(obsoleteDeltaPaths).size();
            AcidMetricService.updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas, deltasThreshold, txnHandler);
            AcidMetricService.updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, txnHandler);
            AcidMetricService.updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, txnHandler);
            LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}", new Object[]{deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas});
        }
        catch (Throwable t) {
            LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
        }
    }

    public static void updateMetricsFromWorker(String dbName, String tableName, String partitionName, CompactionType type, int preWorkerActiveDeltaCount, int preWorkerDeleteDeltaCount, Configuration conf, IMetaStoreClient client) {
        if (!MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) || !MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_THREAD_ON)) {
            LOG.debug("Acid metric collection is not enabled. To turn it on, \"metastore.acidmetrics.thread.on\" and \"metastore.metrics.enabled\" must be set to true and HS2/HMS restarted.");
            return;
        }
        LOG.debug("Updating delta file metrics from worker");
        int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
        int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
        try {
            AcidMetricService.updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS, preWorkerActiveDeltaCount, obsoleteDeltasThreshold, client);
            AcidMetricService.removeDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS, client);
            if (type == CompactionType.MAJOR) {
                AcidMetricService.removeDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, client);
            } else {
                int numNewDeltas = 0;
                if (preWorkerDeleteDeltaCount > 0) {
                    ++numNewDeltas;
                }
                if (preWorkerActiveDeltaCount > preWorkerDeleteDeltaCount) {
                    ++numNewDeltas;
                }
                AcidMetricService.updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, numNewDeltas, deltasThreshold, client);
            }
            LOG.debug("Finished updating delta file metrics from worker.\n deltasThreshold = {}, obsoleteDeltasThreshold = {}, numObsoleteDeltas = {}", new Object[]{deltasThreshold, obsoleteDeltasThreshold, preWorkerActiveDeltaCount});
        }
        catch (Throwable t) {
            LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
        }
    }

    public static void updateMetricsFromCleaner(String dbName, String tableName, String partitionName, List<Path> deletedFiles, Configuration conf, TxnStore txnHandler) {
        if (!metricsEnabled) {
            LOG.debug("Acid metric collection is not enabled. To turn it on, \"metastore.acidmetrics.thread.on\" and \"metastore.metrics.enabled\" must be set to true and HMS restarted.");
            return;
        }
        LOG.debug("Updating delta file metrics from cleaner");
        int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
        try {
            CompactionMetricsData prevObsoleteDelta = txnHandler.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
            int numObsoleteDeltas = 0;
            if (prevObsoleteDelta != null) {
                numObsoleteDeltas = prevObsoleteDelta.getMetricValue() - AcidMetricService.filterOutBaseAndOriginalFiles(deletedFiles).size();
                AcidMetricService.updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, txnHandler);
            }
            LOG.debug("Finished updating delta file metrics from cleaner.\n obsoleteDeltasThreshold = {}, numObsoleteDeltas = {}", (Object)obsoleteDeltasThreshold, (Object)numObsoleteDeltas);
        }
        catch (Throwable t) {
            LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
        }
    }

    private void updateDeltaMetrics() {
        try {
            LOG.debug("Called reporting task.");
            List<CompactionMetricsData> deltas = this.txnHandler.getTopCompactionMetricsDataPerType(this.maxCacheSize);
            Map<String, Integer> deltasMap = deltas.stream().filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_DELTAS).collect(Collectors.toMap(item -> AcidMetricService.getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()), CompactionMetricsData::getMetricValue));
            this.updateDeltaMBeanAndMetric(this.deltaObject, "compaction_num_active_deltas", deltasMap);
            Map<String, Integer> smallDeltasMap = deltas.stream().filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_SMALL_DELTAS).collect(Collectors.toMap(item -> AcidMetricService.getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()), CompactionMetricsData::getMetricValue));
            this.updateDeltaMBeanAndMetric(this.smallDeltaObject, "compaction_num_small_deltas", smallDeltasMap);
            Map<String, Integer> obsoleteDeltasMap = deltas.stream().filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS).collect(Collectors.toMap(item -> AcidMetricService.getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()), CompactionMetricsData::getMetricValue));
            this.updateDeltaMBeanAndMetric(this.obsoleteDeltaObject, "compaction_num_obsolete_deltas", obsoleteDeltasMap);
        }
        catch (Throwable e) {
            LOG.warn("Caught exception while trying to fetch compaction metrics from metastore backend db.", e);
        }
    }

    private void updateDeltaMBeanAndMetric(MetricsMBeanImpl mbean, String metricName, Map<String, Integer> update) {
        mbean.updateAll(update);
        Metrics.getOrCreateMapMetrics(metricName).update(update);
    }

    private void updateMetrics() throws MetaException {
        ShowCompactResponse currentCompactions = this.txnHandler.showCompact(new ShowCompactRequest());
        AcidMetricService.updateMetricsFromShowCompact(currentCompactions);
        this.updateDBMetrics();
    }

    private void updateDBMetrics() throws MetaException {
        MetricsInfo metrics = this.txnHandler.getMetricsInfo();
        Metrics.getOrCreateGauge("compaction_num_txn_to_writeid").set(metrics.getTxnToWriteIdCount());
        Metrics.getOrCreateGauge("compaction_num_completed_txn_components").set(metrics.getCompletedTxnsCount());
        Metrics.getOrCreateGauge("num_open_repl_transactions").set(metrics.getOpenReplTxnsCount());
        Metrics.getOrCreateGauge("oldest_open_repl_txn_id").set(metrics.getOldestOpenReplTxnId());
        Metrics.getOrCreateGauge("oldest_open_repl_txn_age_in_sec").set(metrics.getOldestOpenReplTxnAge());
        Metrics.getOrCreateGauge("num_open_non_repl_transactions").set(metrics.getOpenNonReplTxnsCount());
        Metrics.getOrCreateGauge("oldest_open_non_repl_txn_id").set(metrics.getOldestOpenNonReplTxnId());
        Metrics.getOrCreateGauge("oldest_open_non_repl_txn_age_in_sec").set(metrics.getOldestOpenNonReplTxnAge());
        Metrics.getOrCreateGauge("num_aborted_transactions").set(metrics.getAbortedTxnsCount());
        Metrics.getOrCreateGauge("oldest_aborted_txn_id").set(metrics.getOldestAbortedTxnId());
        Metrics.getOrCreateGauge("oldest_aborted_txn_age_in_sec").set(metrics.getOldestAbortedTxnAge());
        Metrics.getOrCreateGauge("num_locks").set(metrics.getLocksCount());
        Metrics.getOrCreateGauge("oldest_lock_age_in_sec").set(metrics.getOldestLockAge());
        Metrics.getOrCreateGauge("tables_with_x_aborted_transactions").set(metrics.getTablesWithXAbortedTxnsCount());
        Metrics.getOrCreateGauge("oldest_ready_for_cleaning_age_in_sec").set(metrics.getOldestReadyForCleaningAge());
    }

    @VisibleForTesting
    public static void updateMetricsFromShowCompact(ShowCompactResponse showCompactResponse) {
        CompactionMetricData metricData = CompactionMetricData.of(showCompactResponse.getCompacts());
        Map<String, Long> counts = metricData.getStateCount();
        for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
            String key = "compaction_num_" + AcidMetricService.replaceWhitespace(TxnStore.COMPACTION_STATES[i]);
            Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
            if (count != null) {
                Metrics.getOrCreateGauge(key).set(count.intValue());
                continue;
            }
            Metrics.getOrCreateGauge(key).set(0);
        }
        Metrics.getOrCreateMapMetrics("compaction_pools_initiated_item_count").update(metricData.getInitiatedCountPerPool());
        Metrics.getOrCreateMapMetrics("compaction_pools_working_item_count").update(metricData.getWorkingCountPerPool());
        Metrics.getOrCreateMapMetrics("compaction_pools_oldest_enqueue_age_in_sec").update(metricData.getLongestEnqueueDurationPerPool());
        Metrics.getOrCreateMapMetrics("compaction_pools_oldest_working_age_in_sec").update(metricData.getLongestWorkingDurationPerPool());
        AcidMetricService.updateOldestCompactionMetric("compaction_oldest_enqueue_age_in_sec", metricData.getOldestEnqueueTime());
        AcidMetricService.updateOldestCompactionMetric("compaction_oldest_working_age_in_sec", metricData.getOldestWorkingTime());
        AcidMetricService.updateOldestCompactionMetric("compaction_oldest_cleaning_age_in_sec", metricData.getOldestCleaningTime());
        Metrics.getOrCreateGauge("compaction_num_initiators").set((int)metricData.getInitiatorsCount());
        Metrics.getOrCreateGauge("compaction_num_workers").set((int)metricData.getWorkersCount());
        Metrics.getOrCreateGauge("compaction_num_initiator_versions").set((int)metricData.getInitiatorVersionsCount());
        Metrics.getOrCreateGauge("compaction_num_worker_versions").set((int)metricData.getWorkerVersionsCount());
    }

    private static void updateOldestCompactionMetric(String metricName, Long oldestTime) {
        if (oldestTime == null) {
            Metrics.getOrCreateGauge(metricName).set(0);
        } else {
            int oldestAge = (int)((System.currentTimeMillis() - oldestTime) / 1000L);
            Metrics.getOrCreateGauge(metricName).set(oldestAge);
        }
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
        this.txnHandler = TxnUtils.getTxnStore(this.conf);
        metricsEnabled = MetastoreConf.getBoolVar(this.conf, MetastoreConf.ConfVars.METRICS_ENABLED) && MetastoreConf.getBoolVar(this.conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_THREAD_ON);
        try {
            if (metricsEnabled) {
                this.maxCacheSize = MetastoreConf.getIntVar(this.conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_MAX_CACHE_SIZE);
                this.initObjectsForMetrics();
            }
        }
        catch (Exception e) {
            LOG.error("Cannot initialize delta file metrics mbean server. AcidMetricService initialization aborted.", (Throwable)e);
        }
    }

    public Configuration getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public static String replaceWhitespace(String input) {
        if (input == null) {
            return input;
        }
        return input.replaceAll("\\s+", "_");
    }

    private void initObjectsForMetrics() throws Exception {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        this.obsoleteDeltaObject = new MetricsMBeanImpl();
        mbs.registerMBean(this.obsoleteDeltaObject, new ObjectName("metrics:type=compaction,name=compaction_num_obsolete_deltas"));
        this.deltaObject = new MetricsMBeanImpl();
        mbs.registerMBean(this.deltaObject, new ObjectName("metrics:type=compaction,name=compaction_num_active_deltas"));
        this.smallDeltaObject = new MetricsMBeanImpl();
        mbs.registerMBean(this.smallDeltaObject, new ObjectName("metrics:type=compaction,name=compaction_num_small_deltas"));
    }

    static String getDeltaCountKey(String dbName, String tableName, String partitionName) {
        StringBuilder key = new StringBuilder();
        if (dbName == null || dbName.isEmpty()) {
            key.append(tableName);
        } else {
            key.append(dbName).append(".").append(tableName);
        }
        if (partitionName != null && !partitionName.isEmpty()) {
            key.append("/");
            if (partitionName.startsWith("{") && partitionName.endsWith("}")) {
                key.append(partitionName, 1, partitionName.length() - 1);
            } else {
                key.append(partitionName);
            }
        }
        return key.toString();
    }

    private static List<Path> filterOutBaseAndOriginalFiles(List<Path> paths) {
        return paths.stream().filter(p -> p.getName().startsWith("delta_") || p.getName().startsWith("delete_delta_")).collect(Collectors.toList());
    }

    private static void updateDeltaMetrics(String dbName, String tblName, String partitionName, CompactionMetricsData.MetricType type, int numDeltas, int deltasThreshold, TxnStore txnHandler) throws MetaException {
        CompactionMetricsData data = new CompactionMetricsData.Builder().dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type).metricValue(numDeltas).version(0).threshold(deltasThreshold).build();
        if (!txnHandler.updateCompactionMetricsData(data)) {
            LOG.warn("Compaction metric data cannot be updated because of version mismatch.");
        }
    }

    private static void updateDeltaMetrics(String dbName, String tblName, String partitionName, CompactionMetricsMetricType type, int numDeltas, int deltasThreshold, IMetaStoreClient client) throws TException {
        CompactionMetricsDataStruct struct = new CompactionMetricsDataStruct();
        struct.setDbname(dbName);
        struct.setTblname(tblName);
        struct.setPartitionname(partitionName);
        struct.setType(type);
        struct.setMetricvalue(numDeltas);
        struct.setVersion(0);
        struct.setThreshold(deltasThreshold);
        if (!client.updateCompactionMetricsData(struct)) {
            LOG.warn("Compaction metric data cannot be updated because of version mismatch.");
        }
    }

    private static void removeDeltaMetrics(String dbName, String tblName, String partitionName, CompactionMetricsMetricType type, IMetaStoreClient client) throws TException {
        CompactionMetricsDataRequest request = new CompactionMetricsDataRequest(dbName, tblName, type);
        request.setPartitionName(partitionName);
        client.removeCompactionMetricsData(request);
    }
}

