/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.trace;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.htrace.Tracer;
import org.apache.htrace.impl.ProbabilitySampler;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.phoenix.trace.BaseTracingTestIT;
import org.apache.phoenix.trace.TraceReader;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ParallelStatsDisabledTest.class})
@Ignore(value="Will need to revisit for new HDFS/HBase/HTrace, broken on 5.x")
public class PhoenixTracingEndToEndIT
extends BaseTracingTestIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTracingEndToEndIT.class);
    private static final int MAX_RETRIES = 10;
    private String enabledForLoggingTable;
    private String enableForLoggingIndex;

    @Before
    public void setupMetrics() throws Exception {
        this.enabledForLoggingTable = "ENABLED_FOR_LOGGING_" + PhoenixTracingEndToEndIT.generateUniqueName();
        this.enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX_" + PhoenixTracingEndToEndIT.generateUniqueName();
    }

    @Test
    public void testWriteSpans() throws Exception {
        LOGGER.info("testWriteSpans TableName: " + this.tracingTableName);
        this.latch = new CountDownLatch(1);
        this.testTraceWriter.start();
        TraceScope trace = Trace.startSpan((String)"Start write test", (Sampler)Sampler.ALWAYS);
        Span span = trace.getSpan();
        Span child = span.child("child 1");
        child.addTimelineAnnotation("timeline annotation");
        TracingUtils.addAnnotation((Span)child, (String)"test annotation", (int)10);
        child.stop();
        Thread.sleep(100L);
        trace.close();
        Tracer.getInstance().deliver(span);
        Assert.assertTrue((String)"Sink not flushed. commit() not called on the connection", (boolean)this.latch.await(60L, TimeUnit.SECONDS));
        Connection conn = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();
        this.checkStoredTraces(conn, new TraceChecker(){

            @Override
            public boolean foundTrace(TraceReader.TraceHolder trace, TraceReader.SpanInfo info) {
                if (info.description.equals("child 1")) {
                    Assert.assertEquals((String)"Not all annotations present", (long)1L, (long)info.annotationCount);
                    Assert.assertEquals((String)"Not all tags present", (long)1L, (long)info.tagCount);
                    boolean found = false;
                    for (String annotation : info.annotations) {
                        if (!annotation.startsWith("test annotation")) continue;
                        found = true;
                    }
                    Assert.assertTrue((String)("Missing the annotations in span: " + info), (boolean)found);
                    found = false;
                    for (String tag : info.tags) {
                        if (!tag.endsWith("timeline annotation")) continue;
                        found = true;
                    }
                    Assert.assertTrue((String)("Missing the tags in span: " + info), (boolean)found);
                    return true;
                }
                return false;
            }
        });
    }

    @Test
    public void testClientServerIndexingTracing() throws Exception {
        LOGGER.info("testClientServerIndexingTracing TableName: " + this.tracingTableName);
        this.latch = new CountDownLatch(2);
        this.testTraceWriter.start();
        Connection conn = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();
        this.createTestTable(conn, true);
        Connection traceable = PhoenixTracingEndToEndIT.getTracingConnection();
        LOGGER.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + this.enabledForLoggingTable + " VALUES (?, ?)";
        PreparedStatement stmt = traceable.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1L);
        stmt.execute();
        stmt.setString(1, "key2");
        stmt.setLong(2, 2L);
        stmt.execute();
        traceable.commit();
        LOGGER.debug("Waiting for latch to complete!");
        this.latch.await(200L, TimeUnit.SECONDS);
        boolean indexingCompleted = this.checkStoredTraces(conn, new TraceChecker(){

            @Override
            public boolean foundTrace(TraceReader.TraceHolder trace, TraceReader.SpanInfo span) {
                String traceInfo = trace.toString();
                if (traceInfo.contains(PhoenixTracingEndToEndIT.this.tracingTableName)) {
                    return false;
                }
                return traceInfo.contains("Completing index");
            }
        });
        Assert.assertTrue((String)"Never found indexing updates", (boolean)indexingCompleted);
    }

    private void createTestTable(Connection conn, boolean withIndex) throws SQLException {
        String ddl = "create table if not exists " + this.enabledForLoggingTable + "(k varchar not null, c1 bigint CONSTRAINT pk PRIMARY KEY (k))";
        conn.createStatement().execute(ddl);
        if (!withIndex) {
            return;
        }
        ddl = "CREATE INDEX IF NOT EXISTS " + this.enableForLoggingIndex + " on " + this.enabledForLoggingTable + " (c1)";
        conn.createStatement().execute(ddl);
    }

    @Test
    public void testScanTracing() throws Exception {
        LOGGER.info("testScanTracing TableName: " + this.tracingTableName);
        Connection traceable = PhoenixTracingEndToEndIT.getTracingConnection();
        Connection conn = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();
        this.latch = new CountDownLatch(2);
        this.testTraceWriter.start();
        this.createTestTable(conn, false);
        LOGGER.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + this.enabledForLoggingTable + " VALUES (?, ?)";
        PreparedStatement stmt = conn.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1L);
        stmt.execute();
        conn.commit();
        conn.rollback();
        stmt.setString(1, "key2");
        stmt.setLong(2, 2L);
        stmt.execute();
        conn.commit();
        conn.rollback();
        String read = "SELECT * FROM " + this.enabledForLoggingTable;
        ResultSet results = traceable.createStatement().executeQuery(read);
        Assert.assertTrue((String)"Didn't get first result", (boolean)results.next());
        Assert.assertTrue((String)"Didn't get second result", (boolean)results.next());
        results.close();
        Assert.assertTrue((String)"Get expected updates to trace table", (boolean)this.latch.await(200L, TimeUnit.SECONDS));
        boolean tracingComplete = this.checkStoredTraces(conn, new TraceChecker(){

            @Override
            public boolean foundTrace(TraceReader.TraceHolder currentTrace) {
                String traceInfo = currentTrace.toString();
                return traceInfo.contains("Parallel scanner");
            }
        });
        Assert.assertTrue((String)"Didn't find the parallel scanner in the tracing", (boolean)tracingComplete);
    }

    @Test
    public void testScanTracingOnServer() throws Exception {
        LOGGER.info("testScanTracingOnServer TableName: " + this.tracingTableName);
        Connection traceable = PhoenixTracingEndToEndIT.getTracingConnection();
        Connection conn = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();
        this.latch = new CountDownLatch(5);
        this.testTraceWriter.start();
        this.createTestTable(conn, false);
        LOGGER.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + this.enabledForLoggingTable + " VALUES (?, ?)";
        PreparedStatement stmt = conn.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1L);
        stmt.execute();
        conn.commit();
        stmt.setString(1, "key2");
        stmt.setLong(2, 2L);
        stmt.execute();
        conn.commit();
        String read = "SELECT COUNT(*) FROM " + this.enabledForLoggingTable;
        ResultSet results = traceable.createStatement().executeQuery(read);
        Assert.assertTrue((String)"Didn't get count result", (boolean)results.next());
        Assert.assertEquals((String)"Didn't get the expected number of row", (long)2L, (long)results.getInt(1));
        results.close();
        Assert.assertTrue((String)"Didn't get expected updates to trace table", (boolean)this.latch.await(60L, TimeUnit.SECONDS));
        boolean found = this.checkStoredTraces(conn, new TraceChecker(){

            @Override
            public boolean foundTrace(TraceReader.TraceHolder trace) {
                String traceInfo = trace.toString();
                return traceInfo.contains("Scanner opened on server");
            }
        });
        Assert.assertTrue((String)"Didn't find the parallel scanner in the tracing", (boolean)found);
    }

    @Test
    public void testCustomAnnotationTracing() throws Exception {
        LOGGER.info("testCustomAnnotationTracing TableName: " + this.tracingTableName);
        String customAnnotationKey = "myannot";
        String customAnnotationValue = "a1";
        String tenantId = "tenant1";
        Connection traceable = PhoenixTracingEndToEndIT.getTracingConnection((Map<String, String>)ImmutableMap.of((Object)"myannot", (Object)"a1"), "tenant1");
        Connection conn = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();
        this.latch = new CountDownLatch(2);
        this.testTraceWriter.start();
        this.createTestTable(conn, false);
        LOGGER.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + this.enabledForLoggingTable + " VALUES (?, ?)";
        PreparedStatement stmt = conn.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1L);
        stmt.execute();
        conn.commit();
        conn.rollback();
        stmt.setString(1, "key2");
        stmt.setLong(2, 2L);
        stmt.execute();
        conn.commit();
        conn.rollback();
        String read = "SELECT * FROM " + this.enabledForLoggingTable;
        ResultSet results = traceable.createStatement().executeQuery(read);
        Assert.assertTrue((String)"Didn't get first result", (boolean)results.next());
        Assert.assertTrue((String)"Didn't get second result", (boolean)results.next());
        results.close();
        Assert.assertTrue((String)"Get expected updates to trace table", (boolean)this.latch.await(200L, TimeUnit.SECONDS));
        this.assertAnnotationPresent("myannot", "a1", conn);
        this.assertAnnotationPresent("TenantId", "tenant1", conn);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTraceOnOrOff() throws Exception {
        try (Connection conn1 = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();){
            Statement statement = conn1.createStatement();
            ResultSet rs = statement.executeQuery("TRACE ON");
            Assert.assertTrue((boolean)rs.next());
            PhoenixConnection pconn = (PhoenixConnection)conn1;
            long traceId = pconn.getTraceScope().getSpan().getTraceId();
            Assert.assertEquals((long)traceId, (long)rs.getLong(1));
            Assert.assertEquals((long)traceId, (long)rs.getLong("trace_id"));
            Assert.assertFalse((boolean)rs.next());
            Assert.assertEquals((Object)Sampler.ALWAYS, (Object)pconn.getSampler());
            rs = statement.executeQuery("TRACE OFF");
            Assert.assertTrue((boolean)rs.next());
            Assert.assertEquals((long)traceId, (long)rs.getLong(1));
            Assert.assertEquals((long)traceId, (long)rs.getLong("trace_id"));
            Assert.assertFalse((boolean)rs.next());
            Assert.assertEquals((Object)Sampler.NEVER, (Object)pconn.getSampler());
            rs = statement.executeQuery("TRACE OFF");
            Assert.assertFalse((boolean)rs.next());
            rs = statement.executeQuery("TRACE ON  WITH SAMPLING 0.5");
            rs.next();
            Assert.assertTrue((boolean)(((PhoenixConnection)conn1).getSampler() instanceof ProbabilitySampler));
            rs = statement.executeQuery("TRACE ON  WITH SAMPLING 1.0");
            Assert.assertTrue((boolean)rs.next());
            traceId = pconn.getTraceScope().getSpan().getTraceId();
            Assert.assertEquals((long)traceId, (long)rs.getLong(1));
            Assert.assertEquals((long)traceId, (long)rs.getLong("trace_id"));
            Assert.assertFalse((boolean)rs.next());
            Assert.assertEquals((Object)Sampler.ALWAYS, (Object)pconn.getSampler());
            rs = statement.executeQuery("TRACE ON  WITH SAMPLING 0.5");
            rs.next();
            Assert.assertTrue((boolean)(((PhoenixConnection)conn1).getSampler() instanceof ProbabilitySampler));
            rs = statement.executeQuery("TRACE ON WITH SAMPLING 0.0");
            rs.next();
            Assert.assertEquals((Object)Sampler.NEVER, (Object)pconn.getSampler());
            rs = statement.executeQuery("TRACE OFF");
            Assert.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testSingleSpan() throws Exception {
        LOGGER.info("testSingleSpan TableName: " + this.tracingTableName);
        Properties props = new Properties(TestUtil.TEST_PROPERTIES);
        Connection conn = DriverManager.getConnection(PhoenixTracingEndToEndIT.getUrl(), props);
        this.latch = new CountDownLatch(1);
        this.testTraceWriter.start();
        long traceid = 987654L;
        Span span = this.createNewSpan(traceid, 477902L, 10L, "root", 12L, 13L, "Some process", "test annotation for a span");
        Tracer.getInstance().deliver(span);
        Assert.assertTrue((String)"Updates not written in table", (boolean)this.latch.await(60L, TimeUnit.SECONDS));
        this.validateTraces(Collections.singletonList(span), conn, traceid, this.tracingTableName);
    }

    @Test
    public void testMultipleSpans() throws Exception {
        LOGGER.info("testMultipleSpans TableName: " + this.tracingTableName);
        Connection conn = PhoenixTracingEndToEndIT.getConnectionWithoutTracing();
        this.latch = new CountDownLatch(4);
        this.testTraceWriter.start();
        long traceid = 12345L;
        ArrayList<Span> spans = new ArrayList<Span>();
        Span span = this.createNewSpan(traceid, 477902L, 7777L, "root", 10L, 30L, "root process", "root-span tag");
        spans.add(span);
        span = this.createNewSpan(traceid, 7777L, 6666L, "c1", 11L, 15L, "c1 process", "first child");
        spans.add(span);
        span = this.createNewSpan(traceid, 7777L, 5555L, "c2", 11L, 18L, "c2 process", "second child");
        spans.add(span);
        span = this.createNewSpan(traceid, 5555L, 4444L, "c3", 12L, 16L, "c3 process", "third child");
        spans.add(span);
        for (Span span1 : spans) {
            Tracer.getInstance().deliver(span1);
        }
        Assert.assertTrue((String)"Updates not written in table", (boolean)this.latch.await(100L, TimeUnit.SECONDS));
        this.validateTraces(spans, conn, traceid, this.tracingTableName);
    }

    private void validateTraces(List<Span> spans, Connection conn, long traceid, String tableName) throws Exception {
        TraceReader reader = new TraceReader(conn, tableName);
        Collection traces = reader.readAll(1);
        Assert.assertEquals((String)"Got an unexpected number of traces!", (long)1L, (long)traces.size());
        TraceReader.TraceHolder trace = (TraceReader.TraceHolder)traces.iterator().next();
        Assert.assertEquals((String)"Got an unexpected traceid", (long)traceid, (long)trace.traceid);
        Assert.assertEquals((String)"Got an unexpected number of spans", (long)spans.size(), (long)trace.spans.size());
        this.validateTrace(spans, trace);
    }

    private void validateTrace(List<Span> spans, TraceReader.TraceHolder trace) {
        Iterator spanIter = trace.spans.iterator();
        for (Span span : spans) {
            TraceReader.SpanInfo spanInfo = (TraceReader.SpanInfo)spanIter.next();
            LOGGER.info("Checking span:\n" + spanInfo);
            long parentId = span.getParentId();
            if (parentId == 477902L) {
                Assert.assertNull((String)"Got a parent, but it was a root span!", (Object)spanInfo.parent);
            } else {
                Assert.assertEquals((String)"Got an unexpected parent span id", (long)parentId, (long)spanInfo.parent.id);
            }
            Assert.assertEquals((String)"Got an unexpected start time", (long)span.getStartTimeMillis(), (long)spanInfo.start);
            Assert.assertEquals((String)"Got an unexpected end time", (long)span.getStopTimeMillis(), (long)spanInfo.end);
            int annotationCount = 0;
            for (Map.Entry entry : span.getKVAnnotations().entrySet()) {
                int count = annotationCount++;
                Assert.assertEquals((String)"Didn't get expected annotation", (Object)(count + " - " + Bytes.toString((byte[])((byte[])entry.getValue()))), spanInfo.annotations.get(count));
            }
            Assert.assertEquals((String)"Didn't get expected number of annotations", (long)annotationCount, (long)spanInfo.annotationCount);
        }
    }

    private void assertAnnotationPresent(final String annotationKey, final String annotationValue, Connection conn) throws Exception {
        boolean tracingComplete = this.checkStoredTraces(conn, new TraceChecker(){

            @Override
            public boolean foundTrace(TraceReader.TraceHolder currentTrace) {
                return currentTrace.toString().contains(annotationKey + " - " + annotationValue);
            }
        });
        Assert.assertTrue((String)"Didn't find the custom annotation in the tracing", (boolean)tracingComplete);
    }

    private boolean checkStoredTraces(Connection conn, TraceChecker checker) throws Exception {
        TraceReader reader = new TraceReader(conn, this.tracingTableName);
        boolean found = false;
        block0: for (int retries = 0; retries < 10; ++retries) {
            Collection traces = reader.readAll(100);
            for (TraceReader.TraceHolder trace : traces) {
                LOGGER.info("Got trace: " + trace);
                found = checker.foundTrace(trace);
                if (found) break block0;
                for (TraceReader.SpanInfo span : trace.spans) {
                    found = checker.foundTrace(trace, span);
                    if (!found) continue;
                    break block0;
                }
            }
            LOGGER.info("======  Waiting for tracing updates to be propagated ========");
            Thread.sleep(1000L);
        }
        return found;
    }

    private abstract class TraceChecker {
        private TraceChecker() {
        }

        public boolean foundTrace(TraceReader.TraceHolder currentTrace) {
            return false;
        }

        public boolean foundTrace(TraceReader.TraceHolder currentTrace, TraceReader.SpanInfo currentSpan) {
            return false;
        }
    }
}

