/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.end2end.CDCBaseIT;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
@Category(value={NeedsOwnMiniClusterTest.class})
public class CDCQueryIT
extends CDCBaseIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(CDCQueryIT.class);
    private static final int MAX_LOOKBACK_AGE = 10;
    private final boolean forView;
    private final PTable.QualifierEncodingScheme encodingScheme;
    private final boolean multitenant;
    private final Integer tableSaltBuckets;
    private final boolean withSchemaName;
    private final boolean caseSensitiveNames;

    public CDCQueryIT(Boolean forView, PTable.QualifierEncodingScheme encodingScheme, boolean multitenant, Integer tableSaltBuckets, boolean withSchemaName, boolean caseSensitiveNames) {
        this.forView = forView;
        this.encodingScheme = encodingScheme;
        this.multitenant = multitenant;
        this.tableSaltBuckets = tableSaltBuckets;
        this.withSchemaName = withSchemaName;
        this.caseSensitiveNames = caseSensitiveNames;
    }

    @Parameterized.Parameters(name="forView={0}, encodingScheme={1}, multitenant={2}, tableSaltBuckets={3}, withSchemaName={4}, caseSensitiveNames={5}")
    public static synchronized Collection<Object[]> data() {
        return Arrays.asList({Boolean.FALSE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE, Boolean.FALSE}, {Boolean.FALSE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.TRUE, Boolean.FALSE}, {Boolean.FALSE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE, Boolean.TRUE}, {Boolean.FALSE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.TRUE, Boolean.TRUE}, {Boolean.FALSE, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, Boolean.FALSE, Boolean.FALSE}, {Boolean.FALSE, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, Boolean.TRUE, 2, Boolean.TRUE, Boolean.FALSE}, {Boolean.FALSE, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE, Boolean.FALSE}, {Boolean.TRUE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE, Boolean.FALSE}, {Boolean.TRUE, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, Boolean.FALSE, Boolean.TRUE});
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap props = Maps.newHashMapWithExpectedSize((int)1);
        props.put("phoenix.max.lookback.age.seconds", Integer.toString(10));
        CDCQueryIT.setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
    }

    @Before
    public void beforeTest() {
        EnvironmentEdgeManager.reset();
        this.injectEdge = new ManualEnvironmentEdge();
        this.injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
    }

    private void cdcIndexShouldNotBeUsedForDataTableQueries(Connection conn, String dataTableName, String cdcName) throws Exception {
        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + dataTableName + " WHERE PHOENIX_ROW_TIMESTAMP() < CURRENT_TIME()");
        String explainPlan = QueryUtil.getExplainPlan((ResultSet)rs);
        Assert.assertFalse((boolean)explainPlan.contains(cdcName));
    }

    private boolean isDistinctPrefixFilterIncludedInFilterList(FilterList filterList) {
        for (Filter filter : filterList.getFilters()) {
            if (filter instanceof DistinctPrefixFilter) {
                return true;
            }
            if (!(filter instanceof FilterList)) continue;
            return this.isDistinctPrefixFilterIncludedInFilterList((FilterList)filter);
        }
        return false;
    }

    private boolean isDistinctPrefixFilterIncluded(Scan scan) {
        Filter filter = scan.getFilter();
        if (filter != null && filter instanceof DistinctPrefixFilter) {
            return true;
        }
        if (filter instanceof FilterList) {
            return this.isDistinctPrefixFilterIncludedInFilterList((FilterList)filter);
        }
        return false;
    }

    private void checkIndexPartitionIdCount(Connection conn, String tableName, String cdcName) throws Exception {
        int saltBuckets = this.getNonEmptySaltBucketCount(conn, tableName);
        ResultSet rs = conn.createStatement().executeQuery("SELECT PARTITION_ID() FROM " + cdcName + " ORDER BY PARTITION_ID()");
        String[] partitionId = new String[saltBuckets];
        int[] countPerPartition = new int[saltBuckets];
        int partitionIndex = 0;
        Assert.assertTrue((boolean)rs.next());
        partitionId[partitionIndex] = rs.getString(1);
        int n = partitionIndex;
        countPerPartition[n] = countPerPartition[n] + 1;
        LOGGER.info("PARTITION_ID[" + partitionIndex + "] = " + partitionId[partitionIndex]);
        while (rs.next()) {
            if (!partitionId[partitionIndex].equals(rs.getString(1))) {
                partitionId[++partitionIndex] = rs.getString(1);
                LOGGER.info("PARTITION_ID[" + partitionIndex + "] = " + partitionId[partitionIndex]);
            }
            int n2 = partitionIndex;
            countPerPartition[n2] = countPerPartition[n2] + 1;
        }
        Assert.assertEquals((long)saltBuckets, (long)(partitionIndex + 1));
        rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + cdcName);
        Assert.assertTrue((boolean)rs.next());
        partitionIndex = 0;
        partitionId[partitionIndex] = rs.getString(1);
        int rowCount = 1;
        while (rs.next()) {
            if (!partitionId[partitionIndex].equals(rs.getString(1))) {
                partitionId[++partitionIndex] = rs.getString(1);
                LOGGER.info("PARTITION_ID[" + partitionIndex + "] = " + partitionId[partitionIndex]);
            }
            ++rowCount;
        }
        Assert.assertEquals((long)saltBuckets, (long)(partitionIndex + 1));
        Assert.assertEquals((long)saltBuckets, (long)rowCount);
        Assert.assertTrue((boolean)this.isDistinctPrefixFilterIncluded(((PhoenixResultSet)rs).getContext().getScan()));
        PreparedStatement statement = conn.prepareStatement(CDCQueryIT.getCDCQuery(cdcName, partitionId));
        statement.setTimestamp(1, new Timestamp(1000L));
        statement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
        rs = statement.executeQuery();
        rowCount = 0;
        while (rs.next()) {
            ++rowCount;
            String id = rs.getString(1);
            int count = rs.getInt(2);
            boolean found = false;
            for (int i = 0; i < saltBuckets; ++i) {
                if (!partitionId[i].equals(id) || count != countPerPartition[i]) continue;
                found = true;
                break;
            }
            Assert.assertTrue((boolean)found);
        }
        ResultIterator resultIterator = ((PhoenixResultSet)rs).getUnderlyingIterator();
        Assert.assertTrue((boolean)(resultIterator instanceof RowKeyOrderedAggregateResultIterator));
        Assert.assertEquals((long)saltBuckets, (long)rowCount);
    }

    private static String getCDCQuery(String cdcName, String[] partitionId) {
        StringBuilder query = new StringBuilder("SELECT PARTITION_ID(), Count(*) from ");
        query.append(cdcName);
        query.append(" WHERE PARTITION_ID() IN (");
        for (int i = 0; i < partitionId.length - 1; ++i) {
            query.append("'");
            query.append(partitionId[i]);
            query.append("',");
        }
        query.append("'");
        query.append(partitionId[partitionId.length - 1]);
        query.append("')");
        query.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?");
        query.append(" Group By PARTITION_ID()");
        return query.toString();
    }

    private static String addPartitionInList(Connection conn, String cdcName, String query) throws SQLException {
        ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + cdcName);
        ArrayList<String> partitionIds = new ArrayList<String>();
        while (rs.next()) {
            partitionIds.add(rs.getString(1));
        }
        StringBuilder builder = new StringBuilder(query);
        builder.append(" WHERE PARTITION_ID() IN (");
        boolean initialized = false;
        for (String partitionId : partitionIds) {
            if (!initialized) {
                builder.append("'");
                initialized = true;
            } else {
                builder.append(",'");
            }
            builder.append(partitionId);
            builder.append("'");
        }
        builder.append(")");
        return builder.toString();
    }

    private static PreparedStatement getCDCQueryPreparedStatement(Connection conn, String cdcName, String query, long minTimestamp, long maxTimestamp) throws SQLException {
        StringBuilder builder = new StringBuilder(CDCQueryIT.addPartitionInList(conn, cdcName, query));
        builder.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?");
        PreparedStatement statement = conn.prepareStatement(builder.toString());
        statement.setTimestamp(1, new Timestamp(minTimestamp));
        statement.setTimestamp(2, new Timestamp(maxTimestamp));
        return statement;
    }

    @Test
    public void testSelectCDC() throws Exception {
        String cdcName;
        String tableName;
        String schemaName = this.getSchemaName();
        String datatableName = tableName = this.getTableOrViewName(schemaName);
        try (Connection conn = this.newConnection();){
            this.createTable(conn, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = this.getTableOrViewName(schemaName);
                this.createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = viewName;
            }
            cdcName = this.getCDCName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
            this.createCDC(conn, cdc_sql, this.encodingScheme);
        }
        String tenantId = this.multitenant ? "1000" : null;
        String[] tenantids = new String[]{tenantId};
        if (this.multitenant) {
            tenantids = new String[]{tenantId, "2000"};
        }
        long startTS = System.currentTimeMillis();
        List<CDCBaseIT.ChangeRow> changes = this.generateChanges(startTS, tenantids, tableName, null, this.COMMIT_SUCCESS);
        long currentTime = System.currentTimeMillis();
        long endTS = changes.get(changes.size() - 1).getTimestamp() + 1L;
        if (endTS > currentTime) {
            Thread.sleep(endTS - currentTime);
        }
        final String cdcFullName = SchemaUtil.getTableName((String)schemaName, (String)cdcName);
        try (Connection conn = this.newConnection(tenantId);){
            this.dumpCDCResults(conn, cdcName, (Map<String, String>)new TreeMap<String, String>(){
                {
                    this.put("K", "INTEGER");
                }
            }, CDCQueryIT.addPartitionInList(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K,\"CDC JSON\" FROM " + cdcFullName));
            String uncovered_sql = "SELECT  /*+ INDEX(" + tableName + " " + CDCUtil.getCDCIndexName((String)cdcName) + ") */ k, v1 FROM " + tableName;
            try (ResultSet rs = conn.createStatement().executeQuery(uncovered_sql);){
                Assert.assertTrue((boolean)rs.next());
                Assert.assertEquals((long)2L, (long)rs.getInt(1));
                Assert.assertEquals((long)201L, (long)rs.getInt(2));
                Assert.assertTrue((boolean)rs.next());
                Assert.assertEquals((long)3L, (long)rs.getInt(1));
                Assert.assertEquals((long)300L, (long)rs.getInt(2));
                Assert.assertFalse((boolean)rs.next());
            }
            uncovered_sql = "SELECT   k, v1 FROM " + tableName;
            rs = conn.createStatement().executeQuery(uncovered_sql);
            try {
                Assert.assertTrue((boolean)rs.next());
                Assert.assertEquals((long)2L, (long)rs.getInt(1));
                Assert.assertEquals((long)201L, (long)rs.getInt(2));
                Assert.assertTrue((boolean)rs.next());
                Assert.assertEquals((long)3L, (long)rs.getInt(1));
                Assert.assertEquals((long)300L, (long)rs.getInt(2));
                Assert.assertFalse((boolean)rs.next());
            }
            finally {
                if (rs != null) {
                    rs.close();
                }
            }
            TreeMap<String, String> dataColumns = new TreeMap<String, String>(){
                {
                    this.put("V1", "INTEGER");
                    this.put("V2", "INTEGER");
                    this.put("B.VB", "INTEGER");
                }
            };
            this.verifyChangesViaSCN(tenantId, CDCQueryIT.getCDCQueryPreparedStatement(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.verifyChangesViaSCN(tenantId, CDCQueryIT.getCDCQueryPreparedStatement(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K,\"CDC JSON\" FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.verifyChangesViaSCN(tenantId, CDCQueryIT.getCDCQueryPreparedStatement(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, (Map<String, String>)dataColumns, changes, PRE_POST_IMG);
            this.verifyChangesViaSCN(tenantId, CDCQueryIT.getCDCQueryPreparedStatement(conn, cdcFullName, "SELECT * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, (Map<String, String>)dataColumns, changes, new HashSet<PTable.CDCChangeScope>());
            HashMap<String, int[]> testQueries = new HashMap<String, int[]>(){
                {
                    this.put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC, K ASC", new int[]{1, 2, 3, 1, 1, 1, 1, 2, 1, 1, 1, 1});
                    this.put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName + " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3});
                    this.put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName + " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1});
                }
            };
            HashMap dummyChange = new HashMap(){
                {
                    this.put("event_type", "dummy");
                }
            };
            for (Map.Entry testQuery : testQueries.entrySet()) {
                ResultSet rs = conn.createStatement().executeQuery((String)testQuery.getKey());
                try {
                    for (int i = 0; i < ((int[])testQuery.getValue()).length; ++i) {
                        int k = ((int[])testQuery.getValue())[i];
                        Assert.assertEquals((Object)true, (Object)rs.next());
                        Assert.assertEquals((String)("Index: " + i + " for query: " + (String)testQuery.getKey()), (long)k, (long)rs.getInt(2));
                        Map change = (Map)mapper.reader(HashMap.class).readValue(rs.getString(3));
                        change.put("event_type", "dummy");
                        Assert.assertEquals((Object)dummyChange, (Object)change);
                    }
                    Assert.assertEquals((Object)false, (Object)rs.next());
                }
                finally {
                    if (rs == null) continue;
                    rs.close();
                }
            }
            this.cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName);
        }
    }

    @Test
    public void testSelectGeneric() throws Exception {
        String cdcName;
        String tableName;
        String schemaName = this.getSchemaName();
        String datatableName = tableName = this.getTableOrViewName(schemaName);
        TreeMap<String, String> pkColumns = new TreeMap<String, String>(){
            {
                this.put("K1", "INTEGER");
                this.put("K2", "INTEGER");
            }
        };
        TreeMap<String, String> dataColumns = new TreeMap<String, String>(){
            {
                this.put("V1", "INTEGER");
                this.put("V2", "VARCHAR");
                this.put("V3", "CHAR");
                this.put("V4", "DOUBLE");
                this.put("V5", "DATE");
                this.put("V6", "TIME");
                this.put("V7", "TIMESTAMP");
                this.put("V8", "VARBINARY");
                this.put("V9", "BINARY");
                this.put("V10", "VARCHAR ARRAY");
                this.put("V11", "JSON");
            }
        };
        try (Connection conn = this.newConnection();){
            this.createTable(conn, tableName, (Map<String, String>)pkColumns, (Map<String, String>)dataColumns, this.multitenant, this.encodingScheme, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = this.caseSensitiveNames ? SchemaUtil.getTableName((String)schemaName, (String)SchemaUtil.getEscapedArgument((String)CDCQueryIT.generateUniqueName().toLowerCase())) : SchemaUtil.getTableName((String)schemaName, (String)CDCQueryIT.generateUniqueName());
                this.createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = viewName;
            }
            cdcName = this.getCDCName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (change)";
            this.createCDC(conn, cdc_sql, this.encodingScheme);
        }
        String tenantId = this.multitenant ? "1000" : null;
        String[] tenantids = new String[]{tenantId};
        if (this.multitenant) {
            tenantids = new String[]{tenantId, "2000"};
        }
        long startTS = System.currentTimeMillis();
        HashMap<String, List<Set<CDCBaseIT.ChangeRow>>> allBatches = new HashMap<String, List<Set<CDCBaseIT.ChangeRow>>>(tenantids.length);
        for (String tid : tenantids) {
            allBatches.put(tid, this.generateMutations(tenantId, startTS, (Map<String, String>)pkColumns, (Map<String, String>)dataColumns, 20, 5));
            this.applyMutations(this.COMMIT_SUCCESS, schemaName, tableName, datatableName, tid, (List)allBatches.get(tid), cdcName);
        }
        String cdcFullName = SchemaUtil.getTableName((String)schemaName, (String)cdcName);
        try (Connection conn = this.newConnection(tenantId);){
            this.dumpCDCResults(conn, cdcName, (Map<String, String>)pkColumns, CDCQueryIT.addPartitionInList(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName));
            ArrayList<CDCBaseIT.ChangeRow> changes = new ArrayList<CDCBaseIT.ChangeRow>();
            for (Set batch : (List)allBatches.get(tenantId)) {
                changes.addAll(batch);
            }
            long currentTime = System.currentTimeMillis();
            long nextTime = ((CDCBaseIT.ChangeRow)changes.get(changes.size() - 1)).getTimestamp() + 1L;
            if (nextTime > currentTime) {
                Thread.sleep(nextTime - currentTime);
            }
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(CDCQueryIT.addPartitionInList(conn, cdcFullName, "SELECT * FROM " + cdcFullName)), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(CDCQueryIT.addPartitionInList(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName)), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(CDCQueryIT.addPartitionInList(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName)), datatableName, (Map<String, String>)dataColumns, changes, PRE_POST_IMG);
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(CDCQueryIT.addPartitionInList(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName)), datatableName, (Map<String, String>)dataColumns, changes, ALL_IMG);
            this.cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName);
            this.checkIndexPartitionIdCount(conn, tableName, cdcFullName);
        }
    }

    private int getNonEmptySaltBucketCount(Connection conn, String tableName) throws SQLException {
        if (this.tableSaltBuckets == null) {
            return 1;
        }
        HashSet nonEmptySaltBuckets = Sets.newHashSet();
        String query = "SELECT /*+ NO_INDEX */ ROWKEY_BYTES_STRING() FROM " + tableName;
        try (ResultSet rs = conn.createStatement().executeQuery(query);){
            while (rs.next()) {
                String rowKey = rs.getString(1);
                String bucketID = rowKey.substring(0, 4);
                nonEmptySaltBuckets.add(bucketID);
            }
        }
        return nonEmptySaltBuckets.size();
    }

    private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStorageScheme) throws Exception {
        String cdcName;
        String tableName;
        String schemaName = this.getSchemaName();
        String datatableName = tableName = this.getTableOrViewName(schemaName);
        try (Connection conn = this.newConnection();){
            this.createTable(conn, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, true, immutableStorageScheme);
            if (this.forView) {
                String viewName = this.getTableOrViewName(schemaName);
                this.createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = viewName;
            }
            cdcName = this.getCDCName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
            this.createCDC(conn, cdc_sql, this.encodingScheme);
        }
        String tenantId = this.multitenant ? "1000" : null;
        String[] tenantids = new String[]{tenantId};
        if (this.multitenant) {
            tenantids = new String[]{tenantId, "2000"};
        }
        long startTS = System.currentTimeMillis();
        List<CDCBaseIT.ChangeRow> changes = this.generateChangesImmutableTable(startTS, tenantids, schemaName, tableName, datatableName, this.COMMIT_SUCCESS, cdcName);
        String cdcFullName = SchemaUtil.getTableName((String)schemaName, (String)cdcName);
        TreeMap<String, String> dataColumns = new TreeMap<String, String>(){
            {
                this.put("V1", "INTEGER");
                this.put("V2", "INTEGER");
            }
        };
        try (Connection conn = this.newConnection(tenantId);){
            this.dumpCDCResults(conn, cdcName, (Map<String, String>)new TreeMap<String, String>(){
                {
                    this.put("K", "INTEGER");
                }
            }, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K,\"CDC JSON\" FROM " + cdcFullName);
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), datatableName, (Map<String, String>)dataColumns, changes, PRE_POST_IMG);
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcFullName + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName);
        }
    }

    @Test
    public void testSelectCDCImmutableOneCellPerColumn() throws Exception {
        this._testSelectCDCImmutable(PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
    }

    @Test
    public void testSelectCDCImmutableSingleCell() throws Exception {
        this._testSelectCDCImmutable(PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
    }

    @Test
    public void testSelectWithTimeRange() throws Exception {
        String cdcName;
        String tableName;
        String schemaName = this.getSchemaName();
        String datatableName = tableName = this.getTableOrViewName(schemaName);
        TreeMap<String, String> pkColumns = new TreeMap<String, String>(){
            {
                this.put("K1", "INTEGER");
            }
        };
        TreeMap<String, String> dataColumns = new TreeMap<String, String>(){
            {
                this.put("V1", "INTEGER");
            }
        };
        try (Connection conn = this.newConnection();){
            this.createTable(conn, tableName, (Map<String, String>)pkColumns, (Map<String, String>)dataColumns, this.multitenant, this.encodingScheme, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = this.getTableOrViewName(schemaName);
                this.createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = viewName;
            }
            cdcName = this.getCDCName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (change)";
            this.createCDC(conn, cdc_sql, this.encodingScheme);
            this.cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName);
        }
        String tenantId = this.multitenant ? "1000" : null;
        String[] tenantids = new String[]{tenantId};
        if (this.multitenant) {
            tenantids = new String[]{tenantId, "2000"};
        }
        long startTS = System.currentTimeMillis();
        HashMap<String, List<Set<CDCBaseIT.ChangeRow>>> allBatches = new HashMap<String, List<Set<CDCBaseIT.ChangeRow>>>(tenantids.length);
        for (String tid : tenantids) {
            allBatches.put(tid, this.generateMutations(tenantId, startTS, (Map<String, String>)pkColumns, (Map<String, String>)dataColumns, 20, 5));
            this.applyMutations(this.COMMIT_SUCCESS, schemaName, tableName, datatableName, tid, (List)allBatches.get(tid), cdcName);
        }
        String cdcFullName = SchemaUtil.getTableName((String)schemaName, (String)cdcName);
        try (Connection conn = this.newConnection(tenantId);){
            this.dumpCDCResults(conn, cdcName, (Map<String, String>)pkColumns, "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName);
            ArrayList<CDCBaseIT.ChangeRow> changes = new ArrayList<CDCBaseIT.ChangeRow>();
            for (Set batch : (List)allBatches.get(tenantId)) {
                changes.addAll(batch);
            }
            ArrayList<Long> uniqueTimestamps = new ArrayList<Long>();
            Integer lastDeletionTSpos = null;
            for (CDCBaseIT.ChangeRow change : changes) {
                if (uniqueTimestamps.size() == 0 || (Long)uniqueTimestamps.get(uniqueTimestamps.size() - 1) != change.changeTS) {
                    uniqueTimestamps.add(change.changeTS);
                }
                if (change.change != null) continue;
                lastDeletionTSpos = uniqueTimestamps.size() - 1;
            }
            Random rand = new Random();
            int randMinTSpos = rand.nextInt(lastDeletionTSpos - 1);
            int randMaxTSpos = randMinTSpos + 1 + rand.nextInt(uniqueTimestamps.size() - (randMinTSpos + 1));
            this.verifyChangesViaSCN(tenantId, conn, cdcFullName, (Map<String, String>)pkColumns, datatableName, (Map<String, String>)dataColumns, changes, 0L, System.currentTimeMillis());
            this.verifyChangesViaSCN(tenantId, conn, cdcFullName, (Map<String, String>)pkColumns, datatableName, (Map<String, String>)dataColumns, changes, randMinTSpos, randMaxTSpos);
            this.verifyChangesViaSCN(tenantId, conn, cdcFullName, (Map<String, String>)pkColumns, datatableName, (Map<String, String>)dataColumns, changes, randMinTSpos, lastDeletionTSpos.intValue());
            this.verifyChangesViaSCN(tenantId, conn, cdcFullName, (Map<String, String>)pkColumns, datatableName, (Map<String, String>)dataColumns, changes, lastDeletionTSpos.intValue(), randMaxTSpos);
        }
    }

    @Test
    public void testSelectCDCWithDDL() throws Exception {
        String cdcName;
        String tableName;
        String schemaName = this.getSchemaName();
        String datatableName = tableName = this.getTableOrViewName(schemaName);
        try (Connection conn = this.newConnection();){
            this.createTable(conn, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v0 INTEGER, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = this.getTableOrViewName(schemaName);
                this.createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = viewName;
            }
            cdcName = this.getCDCName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
            this.createCDC(conn, cdc_sql, this.encodingScheme);
            conn.createStatement().execute("ALTER TABLE " + datatableName + " DROP COLUMN v0");
        }
        String tenantId = this.multitenant ? "1000" : null;
        String[] tenantids = new String[]{tenantId};
        if (this.multitenant) {
            tenantids = new String[]{tenantId, "2000"};
        }
        long startTS = System.currentTimeMillis();
        List<CDCBaseIT.ChangeRow> changes = this.generateChanges(startTS, tenantids, tableName, datatableName, this.COMMIT_SUCCESS);
        TreeMap<String, String> dataColumns = new TreeMap<String, String>(){
            {
                this.put("V0", "INTEGER");
                this.put("V1", "INTEGER");
                this.put("V1V2", "INTEGER");
                this.put("V2", "INTEGER");
                this.put("B.VB", "INTEGER");
                this.put("V3", "INTEGER");
            }
        };
        try (Connection conn = this.newConnection(tenantId);){
            this.verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + SchemaUtil.getTableName((String)schemaName, (String)cdcName) + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"), datatableName, (Map<String, String>)dataColumns, changes, CHANGE_IMG);
            this.cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName);
        }
    }

    @Test
    public void testSelectCDCFailDataTableUpdate() throws Exception {
        String cdcName;
        String schemaName = this.getSchemaName();
        String tableName = this.getTableOrViewName(schemaName);
        try (Connection conn = this.newConnection();){
            this.createTable(conn, "CREATE TABLE  " + tableName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = this.getTableOrViewName(schemaName);
                this.createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, this.encodingScheme);
                tableName = viewName;
            }
            cdcName = this.getCDCName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
            this.createCDC(conn, cdc_sql, this.encodingScheme);
            this.cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName);
        }
        String tenantId = this.multitenant ? "1000" : null;
        String[] tenantids = new String[]{tenantId};
        if (this.multitenant) {
            tenantids = new String[]{tenantId, "2000"};
        }
        long startTS = System.currentTimeMillis();
        this.generateChanges(startTS, tenantids, tableName, null, this.COMMIT_FAILURE_EXPECTED);
        try (Connection conn = this.newConnection(tenantId);){
            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + SchemaUtil.getTableName((String)schemaName, (String)cdcName));
            Assert.assertEquals((Object)false, (Object)rs.next());
        }
    }

    @Test
    public void testCDCIndexBuildAndVerification() throws Exception {
        String schemaName = this.withSchemaName ? CDCQueryIT.generateUniqueName() : null;
        String tableName = CDCQueryIT.generateUniqueName();
        String tableFullName = SchemaUtil.getTableName((String)schemaName, (String)tableName);
        try (Connection conn = this.newConnection();){
            this.createTable(conn, "CREATE TABLE  " + tableFullName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = CDCQueryIT.generateUniqueName();
                String viewFullName = SchemaUtil.getTableName((String)schemaName, (String)viewName);
                this.createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + tableFullName, this.encodingScheme);
                tableName = viewName;
                tableFullName = viewFullName;
            }
            String tenantId = this.multitenant ? "1000" : null;
            String[] tenantids = new String[]{tenantId};
            if (this.multitenant) {
                tenantids = new String[]{tenantId, "2000"};
            }
            long startTS = System.currentTimeMillis();
            List<CDCBaseIT.ChangeRow> changes = this.generateChanges(startTS, tenantids, tableFullName, tableFullName, this.COMMIT_SUCCESS, null, 0);
            long currentTime = System.currentTimeMillis();
            long nextTime = changes.get(changes.size() - 1).getTimestamp() + 1L;
            if (nextTime > currentTime) {
                Thread.sleep(nextTime - currentTime);
            }
            String cdcName = CDCQueryIT.generateUniqueName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
            this.createCDC(conn, cdc_sql, this.encodingScheme);
            String indexTableFullName = SchemaUtil.getTableName((String)schemaName, (String)CDCUtil.getCDCIndexName((String)cdcName));
            PTable indexTable = ((PhoenixConnection)conn).getTableNoCache(indexTableFullName);
            Assert.assertEquals((Object)indexTable.getIndexState(), (Object)PIndexState.ACTIVE);
            TestUtil.assertRawRowCount(conn, TableName.valueOf((String)indexTable.getPhysicalName().getString()), 0);
            IndexToolIT.runIndexTool(false, schemaName, tableName, CDCUtil.getCDCIndexName((String)cdcName));
            TestUtil.assertRawRowCount(conn, TableName.valueOf((String)indexTable.getPhysicalName().getString()), 0);
            startTS = System.currentTimeMillis();
            changes = this.generateChanges(startTS, tenantids, tableFullName, tableFullName, this.COMMIT_SUCCESS, null, 1);
            currentTime = System.currentTimeMillis();
            nextTime = changes.get(changes.size() - 1).getTimestamp() + 1L;
            if (nextTime > currentTime) {
                Thread.sleep(nextTime - currentTime);
            }
            IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, CDCUtil.getCDCIndexName((String)cdcName), null, 0, IndexTool.IndexVerifyType.ONLY, new String[0]);
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
            Assert.assertEquals((long)0L, (long)indexTool.getJob().getCounters().findCounter((Enum)PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
        }
    }

    @Test
    public void testCDCIndexTTLEqualsToMaxLookbackAge() throws Exception {
        if (this.forView) {
            return;
        }
        String schemaName = this.withSchemaName ? CDCQueryIT.generateUniqueName() : null;
        String tableName = CDCQueryIT.generateUniqueName();
        String tableFullName = SchemaUtil.getTableName((String)schemaName, (String)tableName);
        try (Connection conn = this.newConnection();){
            this.createTable(conn, "CREATE TABLE  " + tableFullName + " (" + (this.multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (this.multitenant ? "(TENANT_ID, k) " : "(k)") + ")", this.encodingScheme, this.multitenant, this.tableSaltBuckets, false, null);
            if (this.forView) {
                String viewName = CDCQueryIT.generateUniqueName();
                String viewFullName = SchemaUtil.getTableName((String)schemaName, (String)viewName);
                this.createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + tableFullName, this.encodingScheme);
                tableName = viewName;
                tableFullName = viewFullName;
            }
            String tenantId = this.multitenant ? "1000" : null;
            String[] tenantids = new String[]{tenantId};
            if (this.multitenant) {
                tenantids = new String[]{tenantId, "2000"};
            }
            String cdcName = CDCQueryIT.generateUniqueName();
            String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
            this.createCDC(conn, cdc_sql, this.encodingScheme);
            long startTS = System.currentTimeMillis();
            List<CDCBaseIT.ChangeRow> changes = this.generateChanges(startTS, tenantids, tableFullName, tableFullName, this.COMMIT_SUCCESS, null, 0);
            String indexTableFullName = SchemaUtil.getTableName((String)schemaName, (String)CDCUtil.getCDCIndexName((String)cdcName));
            PTable indexTable = ((PhoenixConnection)conn).getTableNoCache(indexTableFullName);
            String indexTablePhysicalName = indexTable.getPhysicalName().toString();
            int expectedRawRowCount = TestUtil.getRawRowCount(conn, TableName.valueOf((String)indexTablePhysicalName));
            long currentTime = System.currentTimeMillis();
            long nextTime = changes.get(changes.size() - 1).getTimestamp() + 10000L + 1L;
            if (nextTime > currentTime) {
                Thread.sleep(nextTime - currentTime);
            }
            TestUtil.doMajorCompaction(conn, indexTablePhysicalName);
            TestUtil.assertRawRowCount(conn, TableName.valueOf((String)indexTablePhysicalName), 0);
            IndexToolIT.runIndexTool(false, schemaName, tableName, CDCUtil.getCDCIndexName((String)cdcName));
            TestUtil.assertRawRowCount(conn, TableName.valueOf((String)indexTablePhysicalName), 0);
            startTS = System.currentTimeMillis();
            changes = this.generateChanges(startTS, tenantids, tableFullName, tableFullName, this.COMMIT_SUCCESS, null, 0);
            startTS = changes.get(changes.size() - 1).getTimestamp() + 10000L + 1L;
            changes = this.generateChanges(startTS, tenantids, tableFullName, tableFullName, this.COMMIT_SUCCESS, null, 10);
            nextTime = changes.get(changes.size() - 1).getTimestamp() + 1L;
            currentTime = System.currentTimeMillis();
            if (nextTime > currentTime) {
                Thread.sleep(nextTime - currentTime);
            }
            TestUtil.doMajorCompaction(conn, indexTablePhysicalName);
            TestUtil.assertRawRowCount(conn, TableName.valueOf((String)indexTablePhysicalName), expectedRawRowCount);
            IndexToolIT.runIndexTool(false, schemaName, tableName, CDCUtil.getCDCIndexName((String)cdcName));
            TestUtil.assertRawRowCount(conn, TableName.valueOf((String)indexTablePhysicalName), expectedRawRowCount);
        }
    }

    private String getSchemaName() {
        return this.withSchemaName ? (this.caseSensitiveNames ? SchemaUtil.getEscapedArgument((String)CDCQueryIT.generateUniqueName().toLowerCase()) : CDCQueryIT.generateUniqueName()) : null;
    }

    private String getTableOrViewName(String schemaName) {
        return this.caseSensitiveNames ? SchemaUtil.getTableName((String)schemaName, (String)SchemaUtil.getEscapedArgument((String)CDCQueryIT.generateUniqueName().toLowerCase())) : SchemaUtil.getTableName((String)schemaName, (String)CDCQueryIT.generateUniqueName());
    }

    private String getCDCName() {
        return this.caseSensitiveNames ? SchemaUtil.getEscapedArgument((String)CDCQueryIT.generateUniqueName().toLowerCase()) : CDCQueryIT.generateUniqueName();
    }
}

