/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.core.timeline.upgrade.core;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.ambari.metrics.core.timeline.upgrade.core.AbstractPhoenixMetricsCopier;
import org.apache.ambari.metrics.core.timeline.upgrade.core.PhoenixClusterMetricsCopier;
import org.apache.ambari.metrics.core.timeline.upgrade.core.PhoenixHostMetricsCopier;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

public class MetricsDataMigrationLauncher {
    private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class);
    private static final Long DEFAULT_TIMEOUT_MINUTES = 1440L;
    private static final String PATTERN_PREFIX = "._p_";
    private static final int DEFAULT_BATCH_SIZE = 5;
    private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics";
    public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<String, String>();
    public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<String, String>();
    public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/obdp-metrics-collector/obdp-metrics-migration-state.txt";
    public static final int DEFAULT_NUMBER_OF_THREADS = 3;
    public static final int DEFAULT_START_DAYS = 30;
    private final Set<Set<String>> metricNamesBatches;
    private final String processedMetricsFilePath;
    private final long startTimeEpoch;
    private final int numberOfThreads;
    private TimelineMetricConfiguration timelineMetricConfiguration;
    private PhoenixHBaseAccessor hBaseAccessor;
    private TimelineMetricMetadataManager timelineMetricMetadataManager;
    private Map<String, Set<String>> processedMetrics;

    public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer batchSize) throws Exception {
        this.startTimeEpoch = this.calculateStartEpochTime(startDay);
        this.numberOfThreads = numberOfThreads == null ? 3 : numberOfThreads;
        this.processedMetricsFilePath = processedMetricsFilePath == null ? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
        this.initializeHbaseAccessor();
        this.readProcessedMetricsMap();
        Set<String> metricNames = this.getMetricNames(whitelistedFilePath);
        LOG.info((Object)"Setting up batches...");
        if (batchSize == null) {
            batchSize = 5;
        }
        this.metricNamesBatches = new HashSet<Set<String>>(batchSize);
        Iterables.partition(metricNames, (int)batchSize).forEach(batch -> this.metricNamesBatches.add(new HashSet(batch)));
        LOG.info((Object)String.format("Split metric names into %s batches with size of %s", this.metricNamesBatches.size(), batchSize));
    }

    private long calculateStartEpochTime(Long startDay) {
        long days;
        if (startDay == null) {
            LOG.info((Object)String.format("No starting day have been provided, using default: %d", 30));
            days = 30L;
        } else {
            LOG.info((Object)String.format("%d days have been provided as migration starting day.", startDay));
            days = startDay;
        }
        LOG.info((Object)String.format("The last %d days' data will be migrated.", days));
        return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC);
    }

    private Set<String> getMetricNames(String whitelistedFilePath) throws MalformedURLException, URISyntaxException, SQLException {
        if (StringUtils.isNotEmpty((String)whitelistedFilePath) && whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) {
            LOG.info((Object)"Migration of all metrics has been requested by the --allmetrics argument.");
            LOG.info((Object)"Looking for all the metric names in the Metrics Database...");
            return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream().map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
        }
        if (StringUtils.isNotEmpty((String)whitelistedFilePath)) {
            LOG.info((Object)String.format("Whitelist file %s has been provided.", whitelistedFilePath));
            LOG.info((Object)"Looking for whitelisted metric names based on the file content...");
            return MetricsDataMigrationLauncher.readMetricWhitelistFromFile(whitelistedFilePath);
        }
        Configuration conf = this.timelineMetricConfiguration.getMetricsConf();
        if (Boolean.parseBoolean(conf.get("timeline.metrics.whitelisting.enabled"))) {
            whitelistedFilePath = conf.get("timeline.metrics.whitelist.file", "/etc/obdp-metrics-collector/conf/metrics_whitelist");
            LOG.info((Object)String.format("No whitelist file has been provided but Ambari Metrics Whitelisting is enabled. Using %s as whitelist file.", whitelistedFilePath));
            LOG.info((Object)"Looking for whitelisted metric names based on the file content...");
            return MetricsDataMigrationLauncher.readMetricWhitelistFromFile(whitelistedFilePath);
        }
        LOG.info((Object)"No whitelist file has been provided and Ambari Metrics Whitelisting is disabled.");
        LOG.info((Object)"Looking for all the metric names in the Metrics Database...");
        return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream().map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
    }

    private void readProcessedMetricsMap() {
        HashMap<String, Set<String>> result = new HashMap<String, Set<String>>();
        Path path = Paths.get(this.processedMetricsFilePath, new String[0]);
        if (Files.notExists(path, new LinkOption[0])) {
            LOG.info((Object)String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", this.processedMetricsFilePath));
        } else {
            LOG.info((Object)String.format("Reading the list of already copied metrics from %s", this.processedMetricsFilePath));
            try (Stream<String> stream = Files.lines(path);){
                stream.forEach(line -> {
                    String[] lineSplit = line.split(":");
                    if (!result.containsKey(lineSplit[0])) {
                        result.put(lineSplit[0], new HashSet<String>(Collections.singletonList(lineSplit[1])));
                    } else {
                        ((Set)result.get(lineSplit[0])).add(lineSplit[1]);
                    }
                });
            }
            catch (IOException e) {
                LOG.error((Object)e);
            }
        }
        this.processedMetrics = result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runMigration(Long timeoutInMinutes) throws IOException {
        try (BufferedWriter processedMetricsFileWriter = new BufferedWriter(new FileWriter(this.processedMetricsFilePath, true));){
            LOG.info((Object)"Setting up copiers...");
            HashSet<AbstractPhoenixMetricsCopier> copiers = new HashSet<AbstractPhoenixMetricsCopier>();
            for (Set<String> batch : this.metricNamesBatches) {
                Set<String> filteredMetrics;
                for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
                    filteredMetrics = MetricsDataMigrationLauncher.filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
                    if (filteredMetrics.isEmpty()) continue;
                    copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter));
                }
                for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
                    filteredMetrics = MetricsDataMigrationLauncher.filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
                    if (filteredMetrics.isEmpty()) continue;
                    copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter));
                }
            }
            if (copiers.isEmpty()) {
                LOG.info((Object)"No copy threads to run, looks like all metrics have been copied.");
                return;
            }
            LOG.info((Object)"Running the copy threads...");
            long timerStart = System.currentTimeMillis();
            ExecutorService executorService = null;
            try {
                executorService = Executors.newFixedThreadPool(this.numberOfThreads);
                for (AbstractPhoenixMetricsCopier copier : copiers) {
                    executorService.submit(copier);
                }
            }
            finally {
                if (executorService != null) {
                    executorService.shutdown();
                    try {
                        executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
                    }
                    catch (InterruptedException e) {
                        LOG.error((Object)e);
                    }
                }
            }
            long timerDelta = System.currentTimeMillis() - timerStart;
            LOG.info((Object)String.format("Copying took %s seconds", (double)timerDelta / 1000.0));
        }
    }

    private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException {
        this.hBaseAccessor = new PhoenixHBaseAccessor(null);
        this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
        this.timelineMetricConfiguration.initialize();
        this.timelineMetricMetadataManager = new TimelineMetricMetadataManager(this.hBaseAccessor);
        this.timelineMetricMetadataManager.initializeMetadata(false);
        this.hBaseAccessor.setMetadataInstance(this.timelineMetricMetadataManager);
    }

    private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) {
        if (!processedMetrics.containsKey(tableName)) {
            return metricNames;
        }
        return Sets.difference(metricNames, processedMetrics.get(tableName));
    }

    private static Set<String> readMetricWhitelistFromFile(String whitelistFile) {
        LOG.info((Object)String.format("Reading metric names from %s", whitelistFile));
        HashSet<String> whitelistedMetrics = new HashSet<String>();
        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(whitelistFile)));){
            String strLine;
            while ((strLine = br.readLine()) != null) {
                if (StringUtils.isEmpty((String)(strLine = strLine.trim()))) continue;
                if (strLine.startsWith(PATTERN_PREFIX)) {
                    strLine = strLine.replace(PATTERN_PREFIX, "");
                }
                if (strLine.contains("*")) {
                    strLine = strLine.replaceAll("\\*", "%");
                }
                whitelistedMetrics.add(strLine);
            }
        }
        catch (IOException ioEx) {
            LOG.error((Object)ioEx);
        }
        return whitelistedMetrics;
    }

    private void saveMetadata() throws SQLException {
        LOG.info((Object)"Saving metadata to store...");
        this.timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables();
        this.timelineMetricMetadataManager.forceMetricsMetadataSync();
        LOG.info((Object)"Metadata was saved.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        String processedMetricsFilePath = null;
        String whitelistedFilePath = null;
        Long startDay = null;
        Integer numberOfThreads = null;
        Integer batchSize = null;
        Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
        if (args.length > 0) {
            processedMetricsFilePath = args[0];
        }
        if (args.length > 1) {
            whitelistedFilePath = args[1];
        }
        if (args.length > 2) {
            startDay = Long.valueOf(args[2]);
        }
        if (args.length > 3) {
            numberOfThreads = Integer.valueOf(args[3]);
        }
        if (args.length > 4) {
            batchSize = Integer.valueOf(args[4]);
        }
        if (args.length > 5) {
            timeoutInMinutes = Long.valueOf(args[5]);
        }
        MetricsDataMigrationLauncher dataMigrationLauncher = null;
        try {
            LOG.info((Object)"Initializing system...");
            dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startDay, numberOfThreads, batchSize);
        }
        catch (Exception e) {
            LOG.error((Object)"Exception during system setup, exiting...", (Throwable)e);
            System.exit(1);
        }
        int exitCode = 0;
        try {
            dataMigrationLauncher.runMigration(timeoutInMinutes);
        }
        catch (Throwable e) {
            exitCode = 1;
            LOG.error((Object)"Exception during data migration, exiting...", e);
        }
        finally {
            try {
                dataMigrationLauncher.saveMetadata();
            }
            catch (SQLException e) {
                exitCode = 1;
                LOG.error((Object)"Exception while saving the Metadata, exiting...", (Throwable)e);
            }
        }
        if (exitCode == 0) {
            LOG.info((Object)"Data migration finished successfully.");
        }
        System.exit(exitCode);
    }

    static {
        CLUSTER_AGGREGATE_TABLES_MAPPING.put("METRIC_AGGREGATE_MINUTE", "METRIC_AGGREGATE_MINUTE_UUID");
        CLUSTER_AGGREGATE_TABLES_MAPPING.put("METRIC_AGGREGATE_HOURLY", "METRIC_AGGREGATE_HOURLY_UUID");
        CLUSTER_AGGREGATE_TABLES_MAPPING.put("METRIC_AGGREGATE_DAILY", "METRIC_AGGREGATE_DAILY_UUID");
        HOST_AGGREGATE_TABLES_MAPPING.put("METRIC_RECORD_MINUTE", "METRIC_RECORD_MINUTE_UUID");
        HOST_AGGREGATE_TABLES_MAPPING.put("METRIC_RECORD_HOURLY", "METRIC_RECORD_HOURLY_UUID");
        HOST_AGGREGATE_TABLES_MAPPING.put("METRIC_RECORD_DAILY", "METRIC_RECORD_DAILY_UUID");
    }
}

