/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ambari.metrics.core.timeline.aggregators;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils;
import org.apache.ambari.metrics.core.timeline.aggregators.CustomDownSampler;
import org.apache.ambari.metrics.core.timeline.aggregators.DownSamplerUtils;
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
import org.apache.ambari.metrics.core.timeline.query.Condition;
import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTimelineAggregator
implements TimelineMetricAggregator {
    protected final PhoenixHBaseAccessor hBaseAccessor;
    protected final Logger LOG;
    protected final long checkpointDelayMillis;
    protected final Integer resultsetFetchSize;
    protected Configuration metricsConf;
    private String checkpointLocation;
    private Long sleepIntervalMillis;
    private Integer checkpointCutOffMultiplier;
    private String aggregatorDisableParam;
    protected String tableName;
    protected String outputTableName;
    protected Long nativeTimeRangeDelay;
    protected AggregationTaskRunner taskRunner;
    protected List<String> downsampleMetricPatterns;
    protected List<CustomDownSampler> configuredDownSamplers;
    private final AggregationTaskRunner.AGGREGATOR_NAME aggregatorName;

    AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
        this.aggregatorName = aggregatorName;
        this.hBaseAccessor = hBaseAccessor;
        this.metricsConf = metricsConf;
        this.checkpointDelayMillis = TimeUnit.SECONDS.toMillis(metricsConf.getInt("timeline.metrics.service.checkpointDelay", 120));
        this.resultsetFetchSize = metricsConf.getInt("timeline.metrics.service.resultset.fetchSize", 2000);
        this.LOG = LoggerFactory.getLogger((String)AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get((Object)aggregatorName));
        this.configuredDownSamplers = DownSamplerUtils.getDownSamplers(metricsConf);
        this.downsampleMetricPatterns = DownSamplerUtils.getDownsampleMetricPatterns(metricsConf);
    }

    public AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, Integer checkpointCutOffMultiplier, String aggregatorDisableParam, String tableName, String outputTableName, Long nativeTimeRangeDelay, MetricCollectorHAController haController) {
        this(aggregatorName, hBaseAccessor, metricsConf);
        this.checkpointLocation = checkpointLocation;
        this.sleepIntervalMillis = sleepIntervalMillis;
        this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
        this.aggregatorDisableParam = aggregatorDisableParam;
        this.tableName = tableName;
        this.outputTableName = outputTableName;
        this.nativeTimeRangeDelay = nativeTimeRangeDelay;
        this.taskRunner = haController != null && haController.isInitialized() ? haController.getAggregationTaskRunner() : null;
    }

    @Override
    public void run() {
        this.LOG.info("Started Timeline aggregator thread @ " + new Date());
        Long SLEEP_INTERVAL = this.getSleepIntervalMillis();
        this.runOnce(SLEEP_INTERVAL);
    }

    public void runOnce(Long SLEEP_INTERVAL) {
        boolean performAggregationFunction = true;
        if (this.taskRunner != null) {
            switch (this.getAggregatorType()) {
                case HOST: {
                    performAggregationFunction = this.taskRunner.performsHostAggregation();
                    break;
                }
                case CLUSTER: {
                    performAggregationFunction = this.taskRunner.performsClusterAggregation();
                }
            }
        }
        if (performAggregationFunction) {
            long currentTime = System.currentTimeMillis();
            long lastCheckPointTime = this.readLastCheckpointSavingOnFirstRun(currentTime);
            if (lastCheckPointTime != -1L) {
                this.LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " + (currentTime - lastCheckPointTime) / 1000L + " seconds.");
                boolean success = this.doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
                if (success) {
                    try {
                        this.saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
                    }
                    catch (IOException io) {
                        this.LOG.warn("Error saving checkpoint, restarting aggregation at previous checkpoint.");
                    }
                }
            }
        } else {
            this.LOG.info("Skipping aggregation function not owned by this instance.");
        }
    }

    private long readLastCheckpointSavingOnFirstRun(long currentTime) {
        long lastCheckPointTime = -1L;
        try {
            lastCheckPointTime = this.readCheckPoint();
            if (lastCheckPointTime != -1L) {
                this.LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
                if (this.isLastCheckPointTooOld(currentTime, lastCheckPointTime)) {
                    this.LOG.warn("Last Checkpoint is too old, discarding last checkpoint. lastCheckPointTime = " + new Date(lastCheckPointTime));
                    lastCheckPointTime = AggregatorUtils.getRoundedAggregateTimeMillis(this.getSleepIntervalMillis()) - this.getSleepIntervalMillis();
                    this.LOG.info("Saving checkpoint time. " + new Date(lastCheckPointTime));
                    this.saveCheckPoint(lastCheckPointTime);
                } else {
                    if (lastCheckPointTime > 0L) {
                        lastCheckPointTime = AggregatorUtils.getRoundedCheckPointTimeMillis(lastCheckPointTime, this.getSleepIntervalMillis());
                        this.LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
                    }
                    if (this.isLastCheckPointTooYoung(lastCheckPointTime)) {
                        this.LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
                        return -1L;
                    }
                }
            } else {
                this.LOG.info("No checkpoint found");
                long firstCheckPoint = AggregatorUtils.getRoundedAggregateTimeMillis(this.getSleepIntervalMillis());
                this.LOG.info("Saving checkpoint time. " + new Date(firstCheckPoint));
                this.saveCheckPoint(firstCheckPoint);
            }
        }
        catch (IOException io) {
            this.LOG.warn("Unable to write last checkpoint time. Resuming sleep.", (Throwable)io);
        }
        return lastCheckPointTime;
    }

    private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) {
        return checkpoint != -1L && currentTime - checkpoint > this.getCheckpointCutOffIntervalMillis();
    }

    private boolean isLastCheckPointTooYoung(long checkpoint) {
        return checkpoint != -1L && AggregatorUtils.getRoundedAggregateTimeMillis(this.getSleepIntervalMillis()) <= checkpoint;
    }

    protected long readCheckPoint() {
        if (this.taskRunner != null) {
            return this.taskRunner.getCheckpointManager().readCheckpoint(this.aggregatorName);
        }
        try {
            String contents;
            File checkpoint = new File(this.getCheckpointLocation());
            if (checkpoint.exists() && (contents = FileUtils.readFileToString((File)checkpoint)) != null && !contents.isEmpty()) {
                return Long.parseLong(contents);
            }
        }
        catch (IOException io) {
            this.LOG.debug("", (Throwable)io);
        }
        return -1L;
    }

    protected void saveCheckPoint(long checkpointTime) throws IOException {
        if (this.taskRunner != null) {
            boolean success = this.taskRunner.getCheckpointManager().writeCheckpoint(this.aggregatorName, checkpointTime);
            if (!success) {
                this.LOG.error("Error saving checkpoint with AggregationTaskRunner, aggregator = " + this.aggregatorName + "value = " + checkpointTime);
            }
        } else {
            boolean done;
            File checkpoint = new File(this.getCheckpointLocation());
            if (!checkpoint.exists() && !(done = checkpoint.createNewFile())) {
                throw new IOException("Could not create checkpoint at location, " + this.getCheckpointLocation());
            }
            FileUtils.writeStringToFile((File)checkpoint, (String)String.valueOf(checkpointTime));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean doWork(long startTime, long endTime) {
        this.LOG.info("Start aggregation cycle @ " + new Date() + ", startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
        boolean success = true;
        Condition condition = this.prepareMetricQueryCondition(startTime, endTime);
        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        try {
            conn = this.hBaseAccessor.getConnection();
            stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
            this.LOG.debug("Query issued @: " + new Date());
            if (condition.doUpdate()) {
                conn.setAutoCommit(true);
                int rows = stmt.executeUpdate();
                conn.commit();
                conn.setAutoCommit(false);
                this.LOG.info(rows + " row(s) updated in aggregation.");
            } else {
                rs = stmt.executeQuery();
            }
            this.LOG.debug("Query returned @: " + new Date());
            this.aggregate(rs, startTime, endTime);
        }
        catch (Exception e) {
            this.LOG.error("Exception during aggregating metrics.", (Throwable)e);
            success = false;
        }
        finally {
            if (rs != null) {
                try {
                    rs.close();
                }
                catch (SQLException sQLException) {}
            }
            if (stmt != null) {
                try {
                    stmt.close();
                }
                catch (SQLException sQLException) {}
            }
            if (conn != null) {
                try {
                    conn.close();
                }
                catch (SQLException sQLException) {}
            }
        }
        this.LOG.info("End aggregation cycle @ " + new Date());
        return success;
    }

    protected abstract Condition prepareMetricQueryCondition(long var1, long var3);

    protected abstract void aggregate(ResultSet var1, long var2, long var4) throws IOException, SQLException;

    protected void downsample(Connection conn, Long startTime, Long endTime) {
        this.LOG.debug("Checking for downsampling requests.");
        if (CollectionUtils.isEmpty(this.configuredDownSamplers)) {
            this.LOG.debug("No downsamplers configured");
            return;
        }
        String queryPrefix = "UPSERT INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) ";
        if (this.outputTableName.contains("RECORD")) {
            queryPrefix = "UPSERT INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) ";
        }
        queryPrefix = String.format(queryPrefix, this.outputTableName);
        Iterator<CustomDownSampler> iterator = this.configuredDownSamplers.iterator();
        while (iterator.hasNext()) {
            CustomDownSampler downSampler = iterator.next();
            if (downSampler.validateConfigs()) {
                EmptyCondition downSamplingCondition = new EmptyCondition();
                downSamplingCondition.setDoUpdate(true);
                List<String> stmts = downSampler.prepareDownSamplingStatement(startTime, endTime, this.tableName);
                for (String stmt : stmts) {
                    downSamplingCondition.setStatement(queryPrefix + stmt);
                    this.runDownSamplerQuery(conn, downSamplingCondition);
                }
                continue;
            }
            this.LOG.warn("The following downsampler failed config validation : " + downSampler.getClass().getName() + ".Removing it from downsamplers list.");
            iterator.remove();
        }
    }

    @Override
    public Long getSleepIntervalMillis() {
        return this.sleepIntervalMillis;
    }

    public void setSleepIntervalMillis(Long sleepIntervalMillis) {
        this.sleepIntervalMillis = sleepIntervalMillis;
    }

    protected Integer getCheckpointCutOffMultiplier() {
        return this.checkpointCutOffMultiplier;
    }

    protected Long getCheckpointCutOffIntervalMillis() {
        return (long)this.getCheckpointCutOffMultiplier().intValue() * this.getSleepIntervalMillis();
    }

    @Override
    public boolean isDisabled() {
        return this.metricsConf.getBoolean(this.aggregatorDisableParam, false);
    }

    protected String getQueryHint(Long startTime) {
        StringBuilder sb = new StringBuilder();
        sb.append("/*+ ");
        sb.append("NATIVE_TIME_RANGE(");
        sb.append(startTime - this.nativeTimeRangeDelay);
        sb.append(") ");
        if (this.hBaseAccessor.isSkipBlockCacheForAggregatorsEnabled()) {
            sb.append("NO_CACHE ");
        }
        sb.append("*/");
        return sb.toString();
    }

    protected String getCheckpointLocation() {
        return this.checkpointLocation;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runDownSamplerQuery(Connection conn, Condition condition) {
        PreparedStatement stmt = null;
        ResultSet rs = null;
        this.LOG.debug("Downsampling query : " + condition.getStatement());
        try {
            stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
            this.LOG.info("Start downsampling cycle...");
            if (condition.doUpdate()) {
                int rows = stmt.executeUpdate();
                conn.commit();
                this.LOG.debug(rows + " row(s) updated in downsampling.");
            } else {
                rs = stmt.executeQuery();
            }
            this.LOG.info("End Downsampling cycle.");
        }
        catch (SQLException e) {
            this.LOG.error("Exception during downsampling metrics.", (Throwable)e);
        }
        finally {
            if (rs != null) {
                try {
                    rs.close();
                }
                catch (SQLException sQLException) {}
            }
            if (stmt != null) {
                try {
                    stmt.close();
                }
                catch (SQLException sQLException) {}
            }
            if (conn != null) {
                try {
                    conn.close();
                }
                catch (SQLException sQLException) {}
            }
        }
    }

    protected String getDownsampledMetricSkipClause() {
        return "";
    }

    public TimelineMetricAggregator.AGGREGATOR_TYPE getAggregatorType() {
        if (this.outputTableName.contains("RECORD")) {
            return TimelineMetricAggregator.AGGREGATOR_TYPE.HOST;
        }
        if (this.outputTableName.contains("AGGREGATE")) {
            return TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER;
        }
        return null;
    }

    @Override
    public AggregationTaskRunner.AGGREGATOR_NAME getName() {
        return this.aggregatorName;
    }
}

