/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.trace;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.phoenix.trace.TraceSpanReceiver;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(TraceWriter.class);
    private static final String VARIABLE_VALUE = "?";
    private static final Joiner COLUMN_JOIN = Joiner.on((String)".");
    static final String TAG_FAMILY = "tags";
    static final String TAG_COUNT = COLUMN_JOIN.join((Object)"tags", (Object)"count", new Object[0]);
    static final String ANNOTATION_FAMILY = "annotations";
    static final String ANNOTATION_COUNT = COLUMN_JOIN.join((Object)"annotations", (Object)"count", new Object[0]);
    private static final Joiner COMMAS = Joiner.on((char)',');
    private String tableName;
    private int batchSize;
    private int numThreads;
    private TraceSpanReceiver traceSpanReceiver;
    protected ScheduledExecutorService executor;

    public TraceWriter(String tableName, int numThreads, int batchSize) {
        this.batchSize = batchSize;
        this.numThreads = numThreads;
        this.tableName = tableName;
    }

    public void start() {
        this.traceSpanReceiver = this.getTraceSpanReceiver();
        if (this.traceSpanReceiver == null) {
            LOGGER.warn("No receiver has been initialized for TraceWriter. Traces will not be written.");
            LOGGER.warn("Restart Phoenix to try again.");
            return;
        }
        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
        builder.setDaemon(true).setNameFormat("PHOENIX-METRICS-WRITER");
        this.executor = Executors.newScheduledThreadPool(this.numThreads, builder.build());
        for (int i = 0; i < this.numThreads; ++i) {
            this.executor.scheduleAtFixedRate(new FlushMetrics(), 0L, 10L, TimeUnit.SECONDS);
        }
        LOGGER.info("Writing tracing metrics to phoenix table");
    }

    @VisibleForTesting
    protected TraceSpanReceiver getTraceSpanReceiver() {
        return Tracing.getTraceSpanReceiver();
    }

    public static String getDynamicColumnName(String family, String column, int count) {
        return COLUMN_JOIN.join((Object)family, (Object)column, new Object[0]) + count;
    }

    private void addDynamicEntry(List<String> keys, List<Object> values, List<String> variableValues, String family, String desc, String value, MetricInfo metric, int count) {
        keys.add(TraceWriter.getDynamicColumnName(family, metric.columnName, count) + " VARCHAR");
        String val = desc + " - " + value;
        values.add(VARIABLE_VALUE);
        variableValues.add(val);
    }

    protected Connection getConnection(String tableName) {
        try {
            Properties props = new Properties();
            props.setProperty("phoenix.trace.frequency", Tracing.Frequency.NEVER.getKey());
            Configuration conf = HBaseConfiguration.create();
            Connection conn = QueryUtil.getConnectionOnServer(props, conf);
            if (!this.traceTableExists(conn, tableName)) {
                this.createTable(conn, tableName);
            }
            LOGGER.info("Created new connection for tracing " + conn.toString() + " Table: " + tableName);
            return conn;
        }
        catch (Exception e) {
            LOGGER.error("Tracing will NOT be pursued. New connection failed for tracing Table: " + tableName, (Throwable)e);
            LOGGER.error("Restart Phoenix to retry.");
            return null;
        }
    }

    protected boolean traceTableExists(Connection conn, String traceTableName) throws SQLException {
        try {
            conn.unwrap(PhoenixConnection.class).getTable(traceTableName);
            return true;
        }
        catch (TableNotFoundException e) {
            return false;
        }
    }

    protected void createTable(Connection conn, String table) throws SQLException {
        String ddl = "create table if not exists " + table + "( " + MetricInfo.TRACE.columnName + " bigint not null, " + MetricInfo.PARENT.columnName + " bigint not null, " + MetricInfo.SPAN.columnName + " bigint not null, " + MetricInfo.DESCRIPTION.columnName + " varchar, " + MetricInfo.START.columnName + " bigint, " + MetricInfo.END.columnName + " bigint, " + MetricInfo.HOSTNAME.columnName + " varchar, " + TAG_COUNT + " smallint, " + ANNOTATION_COUNT + " smallint  CONSTRAINT pk PRIMARY KEY (" + MetricInfo.TRACE.columnName + ", " + MetricInfo.PARENT.columnName + ", " + MetricInfo.SPAN.columnName + "))\n" + "TRANSACTIONAL" + "=" + Boolean.FALSE;
        PreparedStatement stmt = conn.prepareStatement(ddl);
        stmt.execute();
    }

    protected void commitBatch(Connection conn) {
        try {
            conn.commit();
        }
        catch (SQLException e) {
            LOGGER.error("Unable to commit traces on conn: " + conn.toString() + " to table: " + this.tableName, (Throwable)e);
        }
    }

    public class FlushMetrics
    implements Runnable {
        private Connection conn;
        private int counter = 0;

        public FlushMetrics() {
            this.conn = TraceWriter.this.getConnection(TraceWriter.this.tableName);
        }

        @Override
        public void run() {
            Span span;
            if (this.conn == null) {
                return;
            }
            while (!TraceWriter.this.traceSpanReceiver.isSpanAvailable() && null != (span = TraceWriter.this.traceSpanReceiver.getSpan())) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Span received: " + span.toJson());
                }
                this.addToBatch(span);
                ++this.counter;
                if (this.counter < TraceWriter.this.batchSize) continue;
                TraceWriter.this.commitBatch(this.conn);
                this.counter = 0;
            }
        }

        private void addToBatch(Span span) {
            String stmt = "UPSERT INTO " + TraceWriter.this.tableName + " (";
            ArrayList<String> keys = new ArrayList<String>();
            ArrayList<Object> values = new ArrayList<Object>();
            ArrayList<String> variableValues = new ArrayList<String>();
            keys.add(MetricInfo.TRACE.columnName);
            values.add(span.getTraceId());
            keys.add(MetricInfo.DESCRIPTION.columnName);
            values.add(TraceWriter.VARIABLE_VALUE);
            variableValues.add(span.getDescription());
            keys.add(MetricInfo.SPAN.traceName);
            values.add(span.getSpanId());
            keys.add(MetricInfo.PARENT.traceName);
            values.add(span.getParentId());
            keys.add(MetricInfo.START.traceName);
            values.add(span.getStartTimeMillis());
            keys.add(MetricInfo.END.traceName);
            values.add(span.getStopTimeMillis());
            int annotationCount = 0;
            int tagCount = 0;
            for (Object ta : span.getTimelineAnnotations()) {
                TraceWriter.this.addDynamicEntry(keys, values, variableValues, TraceWriter.TAG_FAMILY, Long.toString(ta.getTime()), ta.getMessage(), MetricInfo.TAG, tagCount);
                ++tagCount;
            }
            Map annotations = span.getKVAnnotations();
            for (Map.Entry entry : annotations.entrySet()) {
                Pair<String, String> val = TracingUtils.readAnnotation((byte[])entry.getKey(), (byte[])entry.getValue());
                TraceWriter.this.addDynamicEntry(keys, values, variableValues, TraceWriter.ANNOTATION_FAMILY, (String)val.getFirst(), (String)val.getSecond(), MetricInfo.ANNOTATION, annotationCount);
                ++annotationCount;
            }
            keys.add(TAG_COUNT);
            values.add(tagCount);
            keys.add(ANNOTATION_COUNT);
            values.add(annotationCount);
            stmt = stmt + COMMAS.join(keys);
            stmt = stmt + ") VALUES (" + COMMAS.join(values) + ")";
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Logging metrics to phoenix table via: " + stmt);
                LOGGER.trace("With tags: " + variableValues);
            }
            try {
                Throwable throwable = null;
                try (PreparedStatement ps = this.conn.prepareStatement(stmt);){
                    int index = 1;
                    for (String tag : variableValues) {
                        ps.setString(index++, tag);
                    }
                    MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
                    MutationState state = this.conn.unwrap(PhoenixConnection.class).getMutationState();
                    MutationState newState = plan.execute();
                    state.join(newState);
                }
                catch (Throwable throwable2) {
                    Throwable throwable3 = throwable2;
                    throw throwable2;
                }
            }
            catch (SQLException e) {
                LOGGER.error("Could not write metric: \n" + span + " to prepared statement:\n" + stmt, (Throwable)e);
            }
        }
    }
}

