/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.core.timeline;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache;
import org.apache.ambari.metrics.core.timeline.TimelineMetricServiceSummary;
import org.apache.ambari.metrics.core.timeline.TimelineMetricStore;
import org.apache.ambari.metrics.core.timeline.TimelineMetricStoreWatcher;
import org.apache.ambari.metrics.core.timeline.TimelineMetricsFilter;
import org.apache.ambari.metrics.core.timeline.TimelineMetricsIgniteCache;
import org.apache.ambari.metrics.core.timeline.aggregators.Function;
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregatorFactory;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.ambari.metrics.core.timeline.function.SeriesAggregateFunction;
import org.apache.ambari.metrics.core.timeline.function.TimelineMetricsSeriesAggregateFunction;
import org.apache.ambari.metrics.core.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
import org.apache.ambari.metrics.core.timeline.query.Condition;
import org.apache.ambari.metrics.core.timeline.query.ConditionBuilder;
import org.apache.ambari.metrics.core.timeline.query.TopNCondition;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;

public class HBaseTimelineMetricsService
extends AbstractService
implements TimelineMetricStore {
    static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
    private final TimelineMetricConfiguration configuration;
    private TimelineMetricDistributedCache cache;
    private PhoenixHBaseAccessor hBaseAccessor;
    private static volatile boolean isInitialized = false;
    private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
    private final Map<AggregationTaskRunner.AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<AggregationTaskRunner.AGGREGATOR_NAME, ScheduledExecutorService>();
    private TimelineMetricMetadataManager metricMetadataManager;
    private MetricCollectorHAController haController;
    private boolean containerMetricsDisabled = false;
    private String defaultInstanceId = "";

    public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
        super(HBaseTimelineMetricsService.class.getName());
        this.configuration = configuration;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.initializeSubsystem();
    }

    private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException {
        return new TimelineMetricsIgniteCache(this.metricMetadataManager);
    }

    private synchronized void initializeSubsystem() {
        if (!isInitialized) {
            this.hBaseAccessor = new PhoenixHBaseAccessor(null);
            try {
                this.metricMetadataManager = new TimelineMetricMetadataManager(this.hBaseAccessor);
            }
            catch (MalformedURLException | URISyntaxException e) {
                throw new ExceptionInInitializerError("Unable to initialize metadata manager");
            }
            this.metricMetadataManager.initializeMetadata();
            this.hBaseAccessor.initMetricSchema();
            this.hBaseAccessor.initPoliciesAndTTL();
            if (!this.configuration.isDistributedCollectorModeDisabled()) {
                this.haController = new MetricCollectorHAController(this.configuration);
                try {
                    this.haController.initializeHAController();
                }
                catch (Exception e) {
                    LOG.error((Object)e);
                    throw new MetricsSystemInitializationException("Unable to initialize HA controller", e);
                }
            } else {
                LOG.info((Object)"Distributed collector mode disabled");
            }
            TimelineMetricsFilter.initializeMetricFilter(this.configuration);
            Configuration metricsConf = null;
            try {
                metricsConf = this.configuration.getMetricsConf();
            }
            catch (Exception e) {
                throw new ExceptionInInitializerError("Cannot initialize configuration.");
            }
            if (this.configuration.isCollectorInMemoryAggregationEnabled()) {
                try {
                    this.cache = this.startCacheNode();
                }
                catch (Exception e) {
                    throw new MetricsSystemInitializationException("Unable to start cache node", e);
                }
            }
            if (Boolean.parseBoolean(metricsConf.get("timeline.metrics.service.use.groupBy.aggregators", "true"))) {
                LOG.info((Object)"Using group by aggregators for aggregating host and cluster metrics.");
            }
            TimelineMetricAggregator secondClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController, this.cache);
            this.scheduleAggregatorThread(secondClusterAggregator);
            TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController);
            this.scheduleAggregatorThread(minuteClusterAggregator);
            TimelineMetricAggregator hourlyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController);
            this.scheduleAggregatorThread(hourlyClusterAggregator);
            TimelineMetricAggregator dailyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController);
            this.scheduleAggregatorThread(dailyClusterAggregator);
            if (!this.configuration.isHostInMemoryAggregationEnabled()) {
                TimelineMetricAggregator minuteHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController);
                this.scheduleAggregatorThread(minuteHostAggregator);
            }
            TimelineMetricAggregator hourlyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController);
            this.scheduleAggregatorThread(hourlyHostAggregator);
            TimelineMetricAggregator dailyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(this.hBaseAccessor, metricsConf, this.metricMetadataManager, this.haController);
            this.scheduleAggregatorThread(dailyHostAggregator);
            if (!this.configuration.isTimelineMetricsServiceWatcherDisabled()) {
                int initDelay = this.configuration.getTimelineMetricsServiceWatcherInitDelay();
                int delay = this.configuration.getTimelineMetricsServiceWatcherDelay();
                this.watchdogExecutorService.scheduleWithFixedDelay(new TimelineMetricStoreWatcher(this, this.configuration), initDelay, delay, TimeUnit.SECONDS);
                LOG.info((Object)("Started watchdog for timeline metrics store with initial delay = " + initDelay + ", delay = " + delay));
            }
            this.containerMetricsDisabled = this.configuration.isContainerMetricsDisabled();
            this.defaultInstanceId = this.configuration.getDefaultInstanceId();
            isInitialized = true;
        }
    }

    protected void serviceStop() throws Exception {
        super.serviceStop();
    }

    @Override
    public TimelineMetrics getTimelineMetrics(List<String> metricNames, List<String> hostnames, String applicationId, String instanceId, Long startTime, Long endTime, Precision precision, Integer limit, boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
        List<byte[]> uuids;
        if (metricNames == null || metricNames.isEmpty()) {
            throw new IllegalArgumentException("No metric name filter specified.");
        }
        if (startTime == null && endTime != null || startTime != null && endTime == null) {
            throw new IllegalArgumentException("Open ended query not supported ");
        }
        if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT) {
            throw new IllegalArgumentException("Limit too big");
        }
        TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
        if (!StringUtils.isEmpty((String)seriesAggregateFunction)) {
            SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
            seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
        }
        Multimap<String, List<Function>> metricFunctions = HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
        TimelineMetrics metrics = new TimelineMetrics();
        ArrayList<String> transientMetricNames = new ArrayList<String>();
        if (this.configuration.getTimelineMetricsMultipleClusterSupport() && StringUtils.isEmpty((String)instanceId)) {
            instanceId = this.defaultInstanceId;
        }
        if ((uuids = this.metricMetadataManager.getUuidsForGetMetricQuery(metricFunctions.keySet(), hostnames, applicationId, instanceId, transientMetricNames)).isEmpty() && transientMetricNames.isEmpty()) {
            LOG.trace((Object)("No metrics satisfy the query: " + Arrays.asList(metricNames).toString()));
            return metrics;
        }
        ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())).hostnames(hostnames).appId(applicationId).instanceId(instanceId).startTime(startTime).endTime(endTime).precision(precision).limit(limit).grouped(groupedByHosts).uuid(uuids).transientMetricNames(transientMetricNames);
        this.applyTopNCondition(conditionBuilder, topNConfig, metricNames, hostnames);
        Condition condition = conditionBuilder.build();
        metrics = CollectionUtils.isEmpty(hostnames) ? this.hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions) : this.hBaseAccessor.getMetricRecords(condition, metricFunctions);
        metrics = this.postProcessMetrics(metrics);
        if (metrics.getMetrics().size() == 0) {
            return metrics;
        }
        return this.seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
    }

    private void applyTopNCondition(ConditionBuilder conditionBuilder, TopNConfig topNConfig, List<String> metricNames, List<String> hostnames) {
        if (topNConfig != null) {
            if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
                conditionBuilder.topN(topNConfig.getTopN());
                conditionBuilder.isBottomN(topNConfig.getIsBottomN());
                Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
                Function function = new Function(readFunction, null);
                conditionBuilder.topNFunction(function);
            } else {
                LOG.debug((Object)"Invalid Input for TopN query. Ignoring TopN Request.");
            }
        }
    }

    private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
        List metricsList = metrics.getMetrics();
        for (TimelineMetric metric : metricsList) {
            String name = metric.getMetricName();
            if (name.contains("._rate")) {
                HBaseTimelineMetricsService.updateValuesAsRate(metric.getMetricValues(), false);
                continue;
            }
            if (!name.contains("._diff")) continue;
            HBaseTimelineMetricsService.updateValuesAsRate(metric.getMetricValues(), true);
        }
        return metrics;
    }

    private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance, TimelineMetrics metrics) {
        if (seriesAggrFuncInstance != null) {
            TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
            metrics.setMetrics(Collections.singletonList(appliedMetric));
        }
        return metrics;
    }

    static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
        Long prevTime = null;
        Double prevVal = null;
        Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Double> timeValueEntry = it.next();
            Long currTime = timeValueEntry.getKey();
            Double currVal = timeValueEntry.getValue();
            if (prevTime != null) {
                long step = currTime - prevTime;
                Double diff = currVal - prevVal;
                if (diff < 0.0) {
                    it.remove();
                } else {
                    Double rate = isDiff ? diff : diff / (double)TimeUnit.MILLISECONDS.toSeconds(step);
                    timeValueEntry.setValue(rate);
                }
            } else {
                it.remove();
            }
            prevTime = currTime;
            prevVal = currVal;
        }
        return metricValues;
    }

    static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
        ArrayListMultimap metricsFunctions = ArrayListMultimap.create();
        for (String metricName : metricNames) {
            Function function = Function.DEFAULT_VALUE_FUNCTION;
            String cleanMetricName = metricName;
            try {
                function = Function.fromMetricName(metricName);
                int functionStartIndex = metricName.indexOf("._");
                if (functionStartIndex > 0) {
                    cleanMetricName = metricName.substring(0, functionStartIndex);
                }
            }
            catch (Function.FunctionFormatException functionStartIndex) {
                // empty catch block
            }
            ArrayList<Function> functionsList = new ArrayList<Function>();
            functionsList.add(function);
            metricsFunctions.put((Object)cleanMetricName, functionsList);
        }
        return metricsFunctions;
    }

    @Override
    public TimelinePutResponse putMetricsSkipCache(TimelineMetrics metrics) throws SQLException, IOException {
        TimelinePutResponse response = new TimelinePutResponse();
        this.hBaseAccessor.insertMetricRecordsWithMetadata(this.metricMetadataManager, metrics, true);
        return response;
    }

    @Override
    public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
        TimelinePutResponse response = new TimelinePutResponse();
        this.hBaseAccessor.insertMetricRecordsWithMetadata(this.metricMetadataManager, metrics, false);
        if (this.configuration.isCollectorInMemoryAggregationEnabled()) {
            this.cache.putMetrics(metrics.getMetrics());
        }
        return response;
    }

    @Override
    public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) throws SQLException, IOException {
        if (this.containerMetricsDisabled) {
            LOG.debug((Object)"Ignoring submitted container metrics according to configuration. Values will not be stored.");
            return new TimelinePutResponse();
        }
        this.hBaseAccessor.insertContainerMetrics(metrics);
        return new TimelinePutResponse();
    }

    @Override
    public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern, boolean includeBlacklistedMetrics) throws SQLException, IOException {
        return this.metricMetadataManager.getTimelineMetricMetadataByAppId(appId, metricPattern, includeBlacklistedMetrics);
    }

    @Override
    public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
        return this.metricMetadataManager.getUuid(metricName, appId, instanceId, hostname, false);
    }

    @Override
    public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
        Map<String, TimelineMetricHostMetadata> hostsMetadata = this.metricMetadataManager.getHostedAppsCache();
        HashMap<String, Set<String>> hostAppMap = new HashMap<String, Set<String>>();
        for (String hostname : hostsMetadata.keySet()) {
            hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps().keySet());
        }
        return hostAppMap;
    }

    @Override
    public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
        HashMap<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<TimelineMetric, MetricHostAggregate>();
        String hostname = null;
        for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
            aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
            hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname;
        }
        long timestamp = aggregationResult.getTimeInMilis();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)String.format("Adding host %s to aggregated by in-memory aggregator. Timestamp : %s", hostname, timestamp));
        }
        this.hBaseAccessor.saveHostAggregateRecords(aggregateMap, "METRIC_RECORD_MINUTE_UUID");
        return new TimelinePutResponse();
    }

    @Override
    public Map<String, Map<String, Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException {
        Map<String, TimelineMetricHostMetadata> hostedApps = this.metricMetadataManager.getHostedAppsCache();
        Map<Object, Object> instanceHosts = new HashMap();
        if (this.configuration.getTimelineMetricsMultipleClusterSupport()) {
            instanceHosts = this.metricMetadataManager.getHostedInstanceCache();
        }
        HashMap<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<String, Map<String, Set<String>>>();
        if (MapUtils.isEmpty(instanceHosts)) {
            HashMap appHostMap = new HashMap();
            for (String host : hostedApps.keySet()) {
                for (String app : hostedApps.get(host).getHostedApps().keySet()) {
                    if (!appHostMap.containsKey(app)) {
                        appHostMap.put(app, new HashSet());
                    }
                    ((Set)appHostMap.get(app)).add(host);
                }
            }
            instanceAppHosts.put("", appHostMap);
        } else {
            for (String string : instanceHosts.keySet()) {
                if (StringUtils.isNotEmpty((String)instanceId) && !string.equals(instanceId)) continue;
                HashMap appHostMap = new HashMap();
                instanceAppHosts.put(string, appHostMap);
                Set hostsWithInstance = (Set)instanceHosts.get(string);
                for (String host : hostsWithInstance) {
                    for (String app : hostedApps.get(host).getHostedApps().keySet()) {
                        if (StringUtils.isNotEmpty((String)appId) && !app.equals(appId)) continue;
                        if (!appHostMap.containsKey(app)) {
                            appHostMap.put(app, new HashSet());
                        }
                        ((Set)appHostMap.get(app)).add(host);
                    }
                }
            }
        }
        return instanceAppHosts;
    }

    @Override
    public List<String> getLiveInstances() {
        List<String> instances = null;
        try {
            if (this.haController == null) {
                return Collections.singletonList(this.configuration.getInstanceHostnameFromEnv());
            }
            instances = this.haController.getLiveInstanceHostNames();
            if (instances == null || instances.isEmpty()) {
                instances = Collections.singletonList(this.configuration.getInstanceHostnameFromEnv());
            }
        }
        catch (UnknownHostException e) {
            LOG.debug((Object)"Exception on getting hostname from env.", (Throwable)e);
        }
        return instances;
    }

    @Override
    public TimelineMetricServiceSummary getTimelineMetricServiceSummary() {
        return new TimelineMetricServiceSummary(this.metricMetadataManager, this.haController);
    }

    private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
        if (!aggregator.isDisabled()) {
            ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)aggregator.getName()));
                }
            });
            this.scheduledExecutors.put(aggregator.getName(), executorService);
            executorService.scheduleAtFixedRate(aggregator, 0L, aggregator.getSleepIntervalMillis(), TimeUnit.MILLISECONDS);
            LOG.info((Object)("Scheduled aggregator thread " + aggregator.getName() + " every " + aggregator.getSleepIntervalMillis() + " milliseconds."));
        } else {
            LOG.info((Object)("Skipped scheduling " + aggregator.getName() + " since it is disabled."));
        }
    }
}

