/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.core.timeline.availability;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
import org.apache.ambari.metrics.core.timeline.availability.CheckpointManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.OnlineOfflineSMD;

public class MetricCollectorHAController {
    private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
    @VisibleForTesting
    static final String CLUSTER_NAME = "obdp-metrics-cluster";
    @VisibleForTesting
    static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
    @VisibleForTesting
    static final String DEFAULT_STATE_MODEL = "OnlineOffline";
    private static final String INSTANCE_NAME_DELIMITER = "_";
    private static final int PARTITION_NUMBER = 2;
    private static final int REPLICATION_FACTOR = 1;
    @VisibleForTesting
    final String zkConnectUrl;
    private final String instanceHostname;
    private final InstanceConfig instanceConfig;
    private final AggregationTaskRunner aggregationTaskRunner;
    private final List<String> liveInstanceNames = new ArrayList<String>(2);
    private final LiveInstanceTracker liveInstanceTracker = new LiveInstanceTracker();
    @VisibleForTesting
    HelixAdmin admin;
    private HelixManager manager;
    private volatile boolean isInitialized = false;

    public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
        String instancePort;
        try {
            this.instanceHostname = configuration.getInstanceHostnameFromEnv();
            instancePort = configuration.getInstancePort();
        }
        catch (Exception e) {
            LOG.error((Object)"Error reading configs from classpath, will resort to defaults.", (Throwable)e);
            throw new MetricsSystemInitializationException(e.getMessage());
        }
        try {
            String zkClientPort = configuration.getClusterZKClientPort();
            String zkQuorum = configuration.getClusterZKQuorum();
            if (StringUtils.isEmpty((String)zkClientPort) || StringUtils.isEmpty((String)zkQuorum)) {
                throw new Exception(String.format("Unable to parse zookeeper quorum. clientPort = %s, quorum = %s", zkClientPort, zkQuorum));
            }
            this.zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
        }
        catch (Exception e) {
            LOG.error((Object)"Unable to load hbase-site from classpath.", (Throwable)e);
            throw new MetricsSystemInitializationException(e.getMessage(), e);
        }
        this.instanceConfig = new InstanceConfig(this.instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
        this.instanceConfig.setHostName(this.instanceHostname);
        this.instanceConfig.setPort(instancePort);
        this.instanceConfig.setInstanceEnabled(true);
        this.aggregationTaskRunner = new AggregationTaskRunner(this.instanceConfig.getInstanceName(), this.zkConnectUrl, CLUSTER_NAME);
    }

    public void initializeHAController() throws Exception {
        List resources;
        this.admin = new ZKHelixAdmin(this.zkConnectUrl);
        LOG.info((Object)String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
        boolean clusterAdded = this.admin.addCluster(CLUSTER_NAME, false);
        LOG.info((Object)String.format("Was cluster added successfully? %s", clusterAdded));
        boolean success = false;
        int tries = 5;
        int sleepTimeInSeconds = 5;
        for (int i = 0; i < tries && !success; ++i) {
            try {
                List nodes = this.admin.getInstancesInCluster(CLUSTER_NAME);
                if (!nodes.contains(this.instanceConfig.getInstanceName())) {
                    LOG.info((Object)String.format("Adding participant instance %s", this.instanceConfig));
                    this.admin.addInstance(CLUSTER_NAME, this.instanceConfig);
                }
                success = true;
                continue;
            }
            catch (ZkNoNodeException | HelixException ex) {
                LOG.warn((Object)"Helix Cluster not yet setup fully.");
                if (i < tries - 1) {
                    LOG.info((Object)String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds));
                    TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
                    continue;
                }
                LOG.error((Object)ex);
            }
        }
        if (!success) {
            LOG.info((Object)String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
            this.admin.addCluster(CLUSTER_NAME, true);
            List nodes = this.admin.getInstancesInCluster(CLUSTER_NAME);
            if (!nodes.contains(this.instanceConfig.getInstanceName())) {
                LOG.info((Object)String.format("Adding participant instance %s", this.instanceConfig));
                this.admin.addInstance(CLUSTER_NAME, this.instanceConfig);
            }
        }
        if (this.admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
            LOG.info((Object)"Adding ONLINE-OFFLINE state model to the cluster");
            this.admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
        }
        if (!(resources = this.admin.getResourcesInCluster(CLUSTER_NAME)).contains(METRIC_AGGREGATORS)) {
            LOG.info((Object)String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, 2, 1));
            this.admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.toString());
        }
        this.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
        this.startAggregators();
        this.startController();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> this.shutdownHAController()));
        this.isInitialized = true;
    }

    public boolean isInitialized() {
        return this.isInitialized;
    }

    private void startAggregators() {
        try {
            this.aggregationTaskRunner.initialize();
        }
        catch (Exception e) {
            LOG.error((Object)"Unable to start aggregators.", (Throwable)e);
            throw new MetricsSystemInitializationException(e.getMessage(), e);
        }
    }

    private void startController() throws Exception {
        this.manager = HelixManagerFactory.getZKHelixManager((String)CLUSTER_NAME, (String)this.instanceHostname, (InstanceType)InstanceType.CONTROLLER, (String)this.zkConnectUrl);
        this.manager.connect();
        this.manager.addLiveInstanceChangeListener((LiveInstanceChangeListener)this.liveInstanceTracker);
    }

    public void shutdownHAController() {
        if (this.isInitialized) {
            LOG.info((Object)"Shooting down Metrics Collector's HAController.");
            PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME);
            this.manager.removeListener(keyBuilder.liveInstances(), (Object)this.liveInstanceTracker);
            this.liveInstanceTracker.shutdown();
            this.aggregationTaskRunner.stop();
            this.manager.disconnect();
            this.admin.close();
            this.isInitialized = false;
            LOG.info((Object)"Shutdown of Metrics Collector's HAController finished.");
        }
    }

    public AggregationTaskRunner getAggregationTaskRunner() {
        return this.aggregationTaskRunner;
    }

    public List<String> getLiveInstanceHostNames() {
        ArrayList<String> liveInstanceHostNames = new ArrayList<String>(2);
        for (String instance : this.liveInstanceNames) {
            liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
        }
        return liveInstanceHostNames;
    }

    public void printClusterState() {
        StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
        ExternalView resourceExternalView = this.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
        if (resourceExternalView != null) {
            this.getPrintableResourceState(resourceExternalView, sb);
        }
        sb.append("\n##################################################");
        LOG.info((Object)sb.toString());
    }

    private void getPrintableResourceState(ExternalView resourceExternalView, StringBuilder sb) {
        TreeSet sortedSet = new TreeSet(resourceExternalView.getPartitionSet());
        sb.append("\nCLUSTER: ");
        sb.append(CLUSTER_NAME);
        sb.append("\nRESOURCE: ");
        sb.append(METRIC_AGGREGATORS);
        for (String partitionName : sortedSet) {
            sb.append("\nPARTITION: ");
            sb.append(partitionName).append("\t");
            Map states = resourceExternalView.getStateMap(partitionName);
            for (Map.Entry stateEntry : states.entrySet()) {
                sb.append("\t");
                sb.append((String)stateEntry.getKey());
                sb.append("\t");
                sb.append((String)stateEntry.getValue());
            }
        }
    }

    public Map<String, String> getAggregationSummary() {
        LinkedHashMap<String, String> summary = new LinkedHashMap<String, String>();
        CheckpointManager checkpointManager = this.aggregationTaskRunner.getCheckpointManager();
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE)).toString());
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY)).toString());
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY)).toString());
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND)).toString());
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE)).toString());
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY)).toString());
        summary.put(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY), new Date(checkpointManager.readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY)).toString());
        return summary;
    }

    public final class LiveInstanceTracker
    implements LiveInstanceChangeListener {
        private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        private final Joiner joiner = Joiner.on((String)", ").skipNulls();

        public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
            MetricCollectorHAController.this.liveInstanceNames.clear();
            for (LiveInstance instance : liveInstances) {
                MetricCollectorHAController.this.liveInstanceNames.add(instance.getInstanceName());
            }
            LOG.info((Object)String.format("Detected change in liveliness of Collector instances. LiveInstances = %s", this.joiner.join(MetricCollectorHAController.this.liveInstanceNames)));
            this.executorService.schedule(() -> MetricCollectorHAController.this.printClusterState(), 30L, TimeUnit.SECONDS);
        }

        public void shutdown() {
            this.executorService.shutdown();
        }
    }
}

