/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.LiteralTTLExpression;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.types.PBinaryBase;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CDCBaseIT
extends ParallelStatsDisabledIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(CDCBaseIT.class);
    protected static final ObjectMapper mapper = new ObjectMapper();
    static final HashSet<PTable.CDCChangeScope> CHANGE_IMG;
    static final HashSet<PTable.CDCChangeScope> PRE_POST_IMG;
    static final HashSet<PTable.CDCChangeScope> ALL_IMG;
    protected ManualEnvironmentEdge injectEdge;
    protected Calendar cal = Calendar.getInstance();
    protected static RegionCoprocessorEnvironment taskRegionEnvironment;
    protected final CommitAdapter COMMIT_SUCCESS = new CommitAdapter(){

        @Override
        public void commit(Connection conn) throws SQLException {
            conn.commit();
        }
    };
    protected final CommitAdapter COMMIT_FAILURE_EXPECTED = new CommitAdapter(){

        @Override
        public void commit(Connection conn) throws SQLException {
            try {
                conn.commit();
                Assert.fail((String)"Commit expected to fail");
            }
            catch (SQLException sQLException) {
                // empty catch block
            }
        }

        @Override
        void init() {
            IndexRegionObserver.setFailDataTableUpdatesForTesting((boolean)true);
        }

        @Override
        public void reset() {
            IndexRegionObserver.setFailDataTableUpdatesForTesting((boolean)false);
        }
    };

    protected void createTable(Connection conn, String table_sql) throws Exception {
        this.createTable(conn, table_sql, null, false, null, false, null);
    }

    protected void createTable(Connection conn, String table_sql, PTable.QualifierEncodingScheme encodingScheme) throws Exception {
        this.createTable(conn, table_sql, encodingScheme, false, null, false, null);
    }

    protected void createTable(Connection conn, String table_sql, PTable.QualifierEncodingScheme encodingScheme, boolean multitenant) throws Exception {
        this.createTable(conn, table_sql, encodingScheme, multitenant, null, false, null);
    }

    protected void createTable(Connection conn, String table_sql, final PTable.QualifierEncodingScheme encodingScheme, final boolean multitenant, final Integer nSaltBuckets, final boolean immutable, final PTable.ImmutableStorageScheme immutableStorageScheme) throws Exception {
        this.createTable(conn, table_sql, (Map<String, Object>)new HashMap<String, Object>(){
            {
                this.put(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName(), encodingScheme != null ? new Byte(encodingScheme.getSerializedMetadataValue()) : null);
                this.put(TableProperty.MULTI_TENANT.getPropertyName(), multitenant);
                this.put(TableProperty.SALT_BUCKETS.getPropertyName(), nSaltBuckets);
                this.put(TableProperty.IMMUTABLE_ROWS.getPropertyName(), immutable);
                this.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(), immutableStorageScheme != null ? immutableStorageScheme.name() : null);
            }
        });
    }

    protected void createTable(Connection conn, String table_sql, Map<String, Object> tableProps) throws Exception {
        PTable.ImmutableStorageScheme immutableStorageScheme;
        Boolean immutableTable;
        Integer nSaltBuckets;
        Boolean multitenant;
        ArrayList<CallSite> props = new ArrayList<CallSite>();
        Byte encodingScheme = (Byte)TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
        if (encodingScheme != null && encodingScheme != QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES) {
            props.add((CallSite)((Object)(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName() + "=" + encodingScheme)));
        }
        if ((multitenant = (Boolean)TableProperty.MULTI_TENANT.getValue(tableProps)) != null && multitenant.booleanValue()) {
            props.add((CallSite)((Object)(TableProperty.MULTI_TENANT.getPropertyName() + "=" + multitenant)));
        }
        if ((nSaltBuckets = (Integer)TableProperty.SALT_BUCKETS.getValue(tableProps)) != null) {
            props.add((CallSite)((Object)(TableProperty.SALT_BUCKETS.getPropertyName() + "=" + nSaltBuckets)));
        }
        if ((immutableTable = (Boolean)TableProperty.IMMUTABLE_ROWS.getValue(tableProps)).booleanValue()) {
            props.add((CallSite)((Object)(TableProperty.IMMUTABLE_ROWS.getPropertyName() + "=true")));
        }
        if ((immutableStorageScheme = (PTable.ImmutableStorageScheme)TableProperty.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps)) != null) {
            props.add((CallSite)((Object)(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName() + "=" + immutableStorageScheme.name())));
        }
        table_sql = (String)table_sql + " " + String.join((CharSequence)", ", props);
        LOGGER.debug("Creating table with SQL: " + (String)table_sql);
        conn.createStatement().execute((String)table_sql);
    }

    protected void createCDC(Connection conn, String cdc_sql) throws Exception {
        this.createCDC(conn, cdc_sql, null);
    }

    protected void createCDC(Connection conn, String cdc_sql, PTable.QualifierEncodingScheme encodingScheme) throws Exception {
        this.createTable(conn, cdc_sql, encodingScheme, false, null, false, null);
    }

    protected void dropCDC(Connection conn, String cdcName, String tableName) throws SQLException {
        conn.createStatement().execute("DROP CDC " + cdcName + " ON " + tableName);
    }

    protected void assertCDCState(Connection conn, String cdcName, String expInclude, int idxType) throws SQLException {
        try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM system.catalog WHERE table_name = '" + cdcName + "' AND column_name IS NULL and column_family IS NULL");){
            Assert.assertEquals((Object)true, (Object)rs.next());
            Assert.assertEquals((Object)expInclude, (Object)rs.getString(1));
        }
        rs = conn.createStatement().executeQuery("SELECT index_type FROM system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName((String)cdcName) + "' AND column_name IS NULL and column_family IS NULL");
        try {
            Assert.assertEquals((Object)true, (Object)rs.next());
            Assert.assertEquals((long)idxType, (long)rs.getInt(1));
        }
        finally {
            if (rs != null) {
                rs.close();
            }
        }
    }

    protected void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes, String tableName, String datatableName) throws SQLException {
        Properties props = new Properties();
        String schemaName = SchemaUtil.getSchemaNameFromFullName((String)tableName);
        Connection conn = DriverManager.getConnection(CDCBaseIT.getUrl(), props);
        String cdcFullName = SchemaUtil.getTableName((String)schemaName, (String)cdcName);
        PTable cdcTable = PhoenixRuntime.getTable((Connection)conn, (String)cdcFullName);
        Assert.assertEquals(expIncludeScopes, (Object)cdcTable.getCDCIncludeScopes());
        Assert.assertEquals(expIncludeScopes, (Object)TableProperty.INCLUDE.getPTableValue(cdcTable));
        Assert.assertNull((Object)cdcTable.getIndexState());
        Assert.assertNull((Object)cdcTable.getIndexType());
        Assert.assertEquals((Object)tableName, (Object)cdcTable.getParentName().getString());
        String indexFullName = SchemaUtil.getTableName((String)schemaName, (String)CDCUtil.getCDCIndexName((String)cdcName));
        Assert.assertEquals((Object)cdcTable.getPhysicalName().getString(), (Object)(tableName == datatableName ? indexFullName : MetaDataUtil.getViewIndexPhysicalName((String)datatableName)));
        PTable cdcIndexTable = PhoenixRuntime.getTable((Connection)conn, (String)indexFullName);
        Assert.assertEquals((Object)cdcIndexTable.getTTLExpression(), (Object)LiteralTTLExpression.TTL_EXPRESSION_FOREVER);
    }

    protected void assertSaltBuckets(Connection conn, String tableName, Integer nbuckets) throws SQLException {
        PTable table = PhoenixRuntime.getTable((Connection)conn, (String)tableName);
        this.assertSaltBuckets(table, nbuckets);
    }

    protected void assertSaltBuckets(PTable table, Integer nbuckets) {
        if (nbuckets == null || nbuckets == 0) {
            Assert.assertNull((Object)table.getBucketNum());
        } else {
            Assert.assertEquals((Object)nbuckets, (Object)table.getBucketNum());
        }
    }

    protected void assertNoResults(Connection conn, String cdcName) throws SQLException {
        try (Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("select * from " + cdcName);
            Assert.assertFalse((boolean)rs.next());
        }
    }

    protected Connection newConnection() throws SQLException {
        return this.newConnection(null);
    }

    protected Connection newConnection(String tenantId) throws SQLException {
        return this.newConnection(tenantId, new Properties());
    }

    protected Connection newConnection(String tenantId, Properties props) throws SQLException {
        if (tenantId != null) {
            props.setProperty("TenantId", tenantId);
        }
        return DriverManager.getConnection(CDCBaseIT.getUrl(), props);
    }

    private ChangeRow addChange(Connection conn, String tableName, ChangeRow changeRow) throws SQLException {
        Map<String, Object> pks = changeRow.pks;
        Map<String, Object> values = changeRow.change;
        long changeTS = changeRow.changeTS;
        if (conn != null) {
            String sql;
            if (changeRow.getChangeType() == "delete") {
                String predicates = pks.entrySet().stream().map(e -> (String)e.getKey() + " = ?").collect(Collectors.joining(" AND "));
                sql = "DELETE FROM " + tableName + " WHERE " + predicates;
            } else {
                String columnList = Stream.concat(pks.keySet().stream(), values.keySet().stream()).collect(Collectors.joining(", "));
                String bindSql = Stream.generate(() -> "?").limit(pks.size() + values.size()).collect(Collectors.joining(", "));
                sql = "UPSERT INTO " + tableName + " (" + columnList + ") VALUES (" + bindSql + ")";
            }
            this.cal.setTimeInMillis(changeTS);
            this.injectEdge.setValue(changeTS);
            try (PreparedStatement stmt = conn.prepareStatement(sql);){
                int bindCnt = 1;
                for (Object val : pks.values()) {
                    stmt.setObject(bindCnt, val);
                    ++bindCnt;
                }
                if (changeRow.getChangeType() != "delete") {
                    for (Object val : values.values()) {
                        stmt.setObject(bindCnt, val);
                        ++bindCnt;
                    }
                }
                stmt.executeUpdate();
            }
        }
        return changeRow;
    }

    protected List<Set<ChangeRow>> generateMutations(String tenantId, long startTS, Map<String, String> pkColumns, Map<String, String> dataColumns, int nRows, int nBatches) {
        Random rand = new Random();
        ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(nRows);
        HashSet<Map<String, Object>> rowSet = new HashSet<Map<String, Object>>(nRows);
        for (int i = 0; i < nRows; ++i) {
            Map<String, Object> row = this.generateSampleData(rand, pkColumns, false);
            if (rowSet.contains(row)) {
                --i;
                continue;
            }
            rows.add(row);
            rowSet.add(row);
        }
        ArrayList<Set<ChangeRow>> batches = new ArrayList<Set<ChangeRow>>(nBatches);
        HashSet<Map> mutatedRows = new HashSet<Map>(nRows);
        long batchTS = startTS;
        boolean gotDelete = false;
        for (int i = 0; i < nBatches; ++i) {
            TreeSet<ChangeRow> batch = new TreeSet<ChangeRow>();
            for (int j = 0; j < nRows; ++j) {
                ChangeRow changeRow;
                boolean bl;
                if (rand.nextInt(nRows) % 2 != 0) continue;
                if (i > nBatches / 2 && !gotDelete) {
                    bl = true;
                } else {
                    boolean bl2 = bl = mutatedRows.contains(rows.get(j)) && rand.nextInt(5) == 0;
                }
                if (bl) {
                    changeRow = new ChangeRow(tenantId, batchTS, (Map)rows.get(j), null);
                    gotDelete = true;
                } else {
                    changeRow = new ChangeRow(tenantId, batchTS, (Map)rows.get(j), this.generateSampleData(rand, dataColumns, true));
                }
                batch.add(changeRow);
                mutatedRows.add((Map)rows.get(j));
            }
            batches.add(batch);
            batchTS += 100L;
        }
        LOGGER.debug("----- DUMP Mutations -----");
        int bnr = 1;
        int mnr = 0;
        for (Set set : batches) {
            for (ChangeRow change : set) {
                LOGGER.debug("Mutation: " + ++mnr + " in batch: " + bnr + "  tenantId:" + change.tenantId + " changeTS: " + change.changeTS + " pks: " + change.pks + " change: " + change.change);
            }
            ++bnr;
        }
        LOGGER.debug("----------");
        return batches;
    }

    private Map<String, Object> generateSampleData(Random rand, Map<String, String> columns, boolean nullOK) {
        HashMap<String, Object> row = new HashMap<String, Object>();
        for (Map.Entry<String, String> pkCol : columns.entrySet()) {
            Object val;
            if (nullOK && rand.nextInt(5) == 0) {
                row.put(pkCol.getKey(), null);
                continue;
            }
            PDataType dt = PDataType.fromSqlTypeName((String)pkCol.getValue());
            if (dt instanceof PChar || dt instanceof PVarchar) {
                val = dt.getSampleValue(Integer.valueOf(5));
                val = dt instanceof PChar ? ((String)val).trim() : val;
                val = ((String)val).length() == 0 ? "a" : val;
            } else {
                val = dt instanceof PBinaryBase ? dt.getSampleValue(Integer.valueOf(5)) : dt.getSampleValue();
            }
            row.put(pkCol.getKey(), val);
        }
        return row;
    }

    protected void applyMutations(CommitAdapter committer, String schemaName, String tableName, String datatableName, String tid, List<Set<ChangeRow>> batches, String cdcName) throws Exception {
        EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
        try (Connection conn = committer.getConnection(tid);){
            for (Set<ChangeRow> batch : batches) {
                for (ChangeRow changeRow : batch) {
                    this.addChange(conn, tableName, changeRow);
                }
                committer.commit(conn);
            }
        }
        committer.reset();
        this.dumpCells(schemaName, tableName, datatableName, cdcName);
    }

    protected void dumpCells(String schemaName, String tableName, String datatableName, String cdcName) throws Exception {
        LOGGER.debug("----- DUMP data table: " + datatableName + " -----");
        SingleCellIndexIT.dumpTable(datatableName);
        String indexName = CDCUtil.getCDCIndexName((String)cdcName);
        String indexTableName = SchemaUtil.getTableName((String)schemaName, (String)(tableName == datatableName ? indexName : MetaDataUtil.getViewIndexPhysicalName((String)datatableName)));
        LOGGER.debug("----- DUMP index table: " + indexTableName + " -----");
        try {
            SingleCellIndexIT.dumpTable(indexTableName);
        }
        catch (TableNotFoundException tableNotFoundException) {
            // empty catch block
        }
        LOGGER.debug("----------");
    }

    protected void dumpCDCResults(Connection conn, String cdcName, Map<String, String> pkColumns, String cdcQuery) throws Exception {
        try (Statement stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery(cdcQuery);){
            LOGGER.debug("----- DUMP CDC: " + cdcName + " -----");
            int i = 0;
            while (rs.next()) {
                LOGGER.debug("CDC row: " + (i + 1) + " timestamp=" + rs.getDate(1).getTime() + " " + CDCBaseIT.collectColumns(pkColumns, rs) + ", CDC JSON=" + rs.getString(pkColumns.size() + 2));
                ++i;
            }
            LOGGER.debug("----------");
        }
    }

    private static String collectColumns(Map<String, String> pkColumns, ResultSet rs) {
        return pkColumns.keySet().stream().map(k -> {
            try {
                return k + "=" + rs.getObject((String)k);
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.joining(", "));
    }

    protected void createTable(Connection conn, String tableName, Map<String, String> pkColumns, Map<String, String> dataColumns, boolean multitenant, PTable.QualifierEncodingScheme encodingScheme, Integer tableSaltBuckets, boolean immutable, PTable.ImmutableStorageScheme immutableStorageScheme) throws Exception {
        ArrayList<String> pkConstraintCols = new ArrayList<String>();
        if (multitenant) {
            pkConstraintCols.add("TENANT_ID");
        }
        pkConstraintCols.addAll(pkColumns.keySet());
        String pkConstraintSql = "CONSTRAINT PK PRIMARY KEY (" + String.join((CharSequence)", ", pkConstraintCols) + ")";
        String pkColSql = pkColumns.entrySet().stream().map(e -> (String)e.getKey() + " " + (String)e.getValue() + " NOT NULL").collect(Collectors.joining(", "));
        String dataColSql = dataColumns.entrySet().stream().map(e -> (String)e.getKey() + " " + (String)(((String)e.getValue()).equals("CHAR") || ((String)e.getValue()).equals("BINARY") ? (String)e.getValue() + "(5)" : (String)e.getValue())).collect(Collectors.joining(", "));
        String tableSql = "CREATE TABLE " + tableName + " (" + (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + pkColSql + ", " + dataColSql + ", " + pkConstraintSql + ")";
        this.createTable(conn, tableSql, encodingScheme, multitenant, tableSaltBuckets, immutable, immutableStorageScheme);
    }

    protected List<ChangeRow> generateChanges(long startTS, String[] tenantids, String tableName, String datatableNameForDDL, CommitAdapter committer) throws Exception {
        return this.generateChanges(startTS, tenantids, tableName, datatableNameForDDL, committer, "v3", 0);
    }

    protected List<ChangeRow> generateChanges(long startTS, String[] tenantids, String tableName, String datatableNameForDDL, CommitAdapter committer, String columnToDrop, final int startKey) throws Exception {
        ArrayList<ChangeRow> changes = new ArrayList<ChangeRow>();
        EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
        this.injectEdge.setValue(startTS);
        boolean dropColumnDone = false;
        committer.init();
        HashMap rowid1 = new HashMap(){
            {
                this.put("K", startKey + 1);
            }
        };
        HashMap rowid2 = new HashMap(){
            {
                this.put("K", startKey + 2);
            }
        };
        HashMap rowid3 = new HashMap(){
            {
                this.put("K", startKey + 3);
            }
        };
        for (String tid : tenantids) {
            try (Connection conn = committer.getConnection(tid);){
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 100L);
                        this.put("V2", 1000L);
                        this.put("B.VB", 10000L);
                    }
                })));
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS, rowid2, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 200L);
                        this.put("V2", 2000L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid3, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 300L);
                        this.put("V2", null);
                        this.put("B.VB", null);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 101L);
                    }
                })));
                committer.commit(conn);
            }
            if (datatableNameForDDL != null && !dropColumnDone && columnToDrop != null) {
                conn = this.newConnection();
                try {
                    conn.createStatement().execute("ALTER TABLE " + datatableNameForDDL + " DROP COLUMN v3");
                }
                finally {
                    if (conn != null) {
                        conn.close();
                    }
                }
                dropColumnDone = true;
            }
            conn = this.newConnection(tid);
            try {
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 102L);
                        this.put("V2", 1002L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid2, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 201L);
                        this.put("V2", null);
                        this.put("B.VB", 20001L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 103L);
                        this.put("V2", 1003L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 104L);
                        this.put("V2", 1004L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
            }
            finally {
                if (conn != null) {
                    conn.close();
                }
            }
        }
        committer.reset();
        for (int i = 0; i < changes.size(); ++i) {
            LOGGER.debug("----- generated change: " + i + " tenantId:" + ((ChangeRow)changes.get((int)i)).tenantId + " changeTS: " + ((ChangeRow)changes.get((int)i)).changeTS + " pks: " + ((ChangeRow)changes.get((int)i)).pks + " change: " + ((ChangeRow)changes.get((int)i)).change);
        }
        return changes;
    }

    protected void verifyChangesViaSCN(String tenantId, Connection conn, String cdcFullName, Map<String, String> pkColumns, String dataTableName, Map<String, String> dataColumns, List<ChangeRow> changes, long startTS, long endTS) throws Exception {
        ArrayList<ChangeRow> filteredChanges = new ArrayList<ChangeRow>();
        for (ChangeRow change : changes) {
            if (change.changeTS < startTS || change.changeTS > endTS) continue;
            filteredChanges.add(change);
        }
        String cdcSql = "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROm " + cdcFullName + " WHERE  PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(" + startTS + " AS BIGINT) AS TIMESTAMP) AND PHOENIX_ROW_TIMESTAMP() <= CAST(CAST(" + endTS + " AS BIGINT) AS TIMESTAMP)";
        this.dumpCDCResults(conn, cdcFullName, (Map<String, String>)new TreeMap<String, String>(){
            {
                this.put("K1", "INTEGER");
            }
        }, cdcSql);
        try (ResultSet rs = conn.createStatement().executeQuery(cdcSql);){
            this.verifyChangesViaSCN(tenantId, rs, dataTableName, dataColumns, filteredChanges, CHANGE_IMG);
        }
    }

    /*
     * WARNING - void declaration
     */
    protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTableName, Map<String, String> dataColumns, List<ChangeRow> changes, Set<PTable.CDCChangeScope> changeScopes) throws Exception {
        HashMap changeMap = new HashMap();
        HashSet<Map<String, Object>> deletedRows = new HashSet<Map<String, Object>>();
        for (ChangeRow changeRow : changes) {
            void var11_16;
            if (tenantId != null && changeRow.getTenantID() != tenantId) continue;
            if (changeRow.getChangeType() == "delete") {
                if (deletedRows.contains(changeRow.pks)) continue;
                deletedRows.add(changeRow.pks);
            } else {
                deletedRows.remove(changeRow.pks);
            }
            List list = (List)changeMap.get(changeRow.pks);
            if (list == null) {
                ArrayList arrayList = new ArrayList();
                changeMap.put(changeRow.pks, arrayList);
            }
            var11_16.add(changeRow);
        }
        while (rs.next()) {
            HashMap<String, Object> pks = new HashMap<String, Object>();
            for (Map.Entry<String, Object> entry : changes.get((int)0).pks.entrySet()) {
                pks.put(entry.getKey(), rs.getObject(entry.getKey()));
            }
            ChangeRow changeRow = (ChangeRow)((List)changeMap.get(pks)).remove(0);
            String string = "Change: " + changeRow;
            for (Map.Entry<String, Object> pkCol : changeRow.pks.entrySet()) {
                if (pkCol.getValue().equals(rs.getObject(pkCol.getKey()))) continue;
                Assert.assertEquals((String)string, (Object)pkCol.getValue(), (Object)rs.getObject(pkCol.getKey()));
            }
            Map cdcObj = (Map)mapper.reader(HashMap.class).readValue(rs.getString(changeRow.pks.size() + 2));
            if (!changeRow.getChangeType().equals(cdcObj.get("event_type"))) {
                Assert.assertEquals((String)string, (Object)changeRow.getChangeType(), cdcObj.get("event_type"));
            }
            if (cdcObj.containsKey("pre_image") && !((Map)cdcObj.get("pre_image")).isEmpty() && changeScopes.contains(PTable.CDCChangeScope.PRE)) {
                Map<String, Object> preImage = this.getRowImage(string, tenantId, dataTableName, dataColumns, changeRow, changeRow.changeTS);
                Assert.assertEquals((String)string, preImage, this.fillInNulls((Map)cdcObj.get("pre_image"), dataColumns.keySet()));
            }
            if (changeScopes.contains(PTable.CDCChangeScope.CHANGE)) {
                Assert.assertEquals((String)string, this.encodeValues(this.fillInNulls(changeRow.change, dataColumns.keySet()), dataColumns), this.fillInNulls((Map)cdcObj.get("change_image"), dataColumns.keySet()));
            }
            if (changeRow.getChangeType() == "delete" || !changeScopes.contains(PTable.CDCChangeScope.POST)) continue;
            Map<String, Object> postImage = this.getRowImage(string, tenantId, dataTableName, dataColumns, changeRow, changeRow.changeTS + 1L);
            Assert.assertEquals((String)string, postImage, this.fillInNulls((Map)cdcObj.get("post_image"), dataColumns.keySet()));
        }
        for (List list : changeMap.values()) {
            Assert.assertTrue((boolean)list.isEmpty());
        }
    }

    protected Map<String, Object> getRowImage(String changeDesc, String tenantId, String dataTableName, Map<String, String> dataColumns, ChangeRow changeRow, long scnTimestamp) throws Exception {
        HashMap<String, Object> image = new HashMap<String, Object>();
        Properties props = new Properties();
        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
        Map<String, String> projections = dataColumns.keySet().stream().collect(Collectors.toMap(s -> s, s -> s.replaceFirst(".*\\.", "")));
        String projection = projections.values().stream().collect(Collectors.joining(", "));
        String predicates = changeRow.pks.entrySet().stream().map(e -> (String)e.getKey() + " = ?").collect(Collectors.joining(" AND "));
        try (Connection conn = this.newConnection(tenantId, props);){
            PreparedStatement stmt = conn.prepareStatement("SELECT " + projection + " FROM " + dataTableName + " WHERE " + predicates);
            int bindCnt = 1;
            for (Object val : changeRow.pks.values()) {
                stmt.setObject(bindCnt, val);
                ++bindCnt;
            }
            ResultSet rs = stmt.executeQuery();
            Assert.assertTrue((String)changeDesc, (boolean)rs.next());
            for (String colName : projections.keySet()) {
                PDataType dt = PDataType.fromSqlTypeName((String)dataColumns.get(colName));
                image.put(colName, this.getJsonEncodedValue(rs.getObject(projections.get(colName)), dt));
            }
        }
        return image;
    }

    private Object getJsonEncodedValue(Object val, PDataType dt) {
        val = val instanceof Byte || val instanceof Short || val instanceof Integer ? Long.valueOf(((Number)val).longValue()) : (val instanceof Float ? Double.valueOf(((Number)val).doubleValue()) : CDCUtil.getColumnEncodedValue((Object)val, (PDataType)dt));
        return val;
    }

    private Map<String, Object> fillInNulls(Map<String, Object> image, Collection<String> dataCols) {
        if (image != null) {
            image = new HashMap<String, Object>(image);
            for (String colName : dataCols) {
                if (image.containsKey(colName)) continue;
                image.put(colName, null);
            }
        }
        return image;
    }

    private Map<String, Object> encodeValues(Map<String, Object> image, Map<String, String> dataColumns) {
        if (image != null) {
            image = new HashMap<String, Object>(image);
            for (Map.Entry<String, String> col : dataColumns.entrySet()) {
                if (!image.containsKey(col.getKey())) continue;
                image.put(col.getKey(), this.getJsonEncodedValue(image.get(col.getKey()), PDataType.fromSqlTypeName((String)col.getValue())));
            }
        }
        return image;
    }

    protected List<ChangeRow> generateChangesImmutableTable(long startTS, String[] tenantids, String schemaName, String tableName, String datatableName, CommitAdapter committer, String cdcName) throws Exception {
        ArrayList<ChangeRow> changes = new ArrayList<ChangeRow>();
        EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
        this.injectEdge.setValue(startTS);
        committer.init();
        HashMap rowid1 = new HashMap(){
            {
                this.put("K", 1);
            }
        };
        HashMap rowid2 = new HashMap(){
            {
                this.put("K", 2);
            }
        };
        HashMap rowid3 = new HashMap(){
            {
                this.put("K", 3);
            }
        };
        for (String tid : tenantids) {
            try (Connection conn = this.newConnection(tid);){
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 100L);
                        this.put("V2", 1000L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid2, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 200L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid3, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 300L);
                        this.put("V2", null);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 102L);
                        this.put("V2", 1002L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 103L);
                        this.put("V2", 1003L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, (Map<String, Object>)new TreeMap<String, Object>(){
                    {
                        this.put("V1", 104L);
                        this.put("V2", 1004L);
                    }
                })));
                committer.commit(conn);
                changes.add(this.addChange(conn, tableName, new ChangeRow(tid, startTS += 100L, rowid1, null)));
                committer.commit(conn);
            }
        }
        committer.reset();
        this.dumpCells(schemaName, tableName, datatableName, cdcName);
        for (int i = 0; i < changes.size(); ++i) {
            LOGGER.debug("----- generated change: " + i + " tenantId:" + ((ChangeRow)changes.get((int)i)).tenantId + " changeTS: " + ((ChangeRow)changes.get((int)i)).changeTS + " pks: " + ((ChangeRow)changes.get((int)i)).pks + " change: " + ((ChangeRow)changes.get((int)i)).change);
        }
        return changes;
    }

    public String getStreamName(Connection conn, String tableName, String cdcName) throws SQLException {
        long creationTS = CDCUtil.getCDCCreationTimestamp((PTable)conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName));
        return String.format(CDCUtil.CDC_STREAM_NAME_FORMAT, tableName, cdcName, creationTS, CDCUtil.getCDCCreationUTCDateTime((long)creationTS));
    }

    public String getStreamStatus(Connection conn, String tableName, String streamName) throws Exception {
        ResultSet rs = conn.createStatement().executeQuery("SELECT STREAM_STATUS FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + tableName + "' AND STREAM_NAME='" + streamName + "'");
        Assert.assertTrue((boolean)rs.next());
        return rs.getString(1);
    }

    public void createTableAndEnableCDC(Connection conn, String tableName, boolean useTaskRegionObserver) throws Exception {
        String cdcName = CDCBaseIT.generateUniqueName();
        String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName;
        conn.createStatement().execute("CREATE TABLE  " + tableName + " (k VARCHAR PRIMARY KEY, v1 INTEGER, v2 VARCHAR)");
        this.createCDC(conn, cdcSql, null);
        String streamName = this.getStreamName(conn, tableName, cdcName);
        if (useTaskRegionObserver) {
            TaskRegionObserver.SelfHealingTask task = new TaskRegionObserver.SelfHealingTask(taskRegionEnvironment, 1800000L);
            task.run();
        } else {
            while (!CDCUtil.CdcStreamStatus.ENABLED.toString().equals(this.getStreamStatus(conn, tableName, streamName))) {
                Thread.sleep(1000L);
            }
        }
    }

    static {
        SimpleModule module = new SimpleModule("ChangeRow", new Version(1, 0, 0, null, null, null));
        PhoenixArraySerializer phoenixArraySerializer = new PhoenixArraySerializer(PhoenixArray.class);
        module.addSerializer(PhoenixArray.class, (JsonSerializer)phoenixArraySerializer);
        mapper.registerModule((Module)module);
        mapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true);
        CHANGE_IMG = new HashSet<PTable.CDCChangeScope>(Arrays.asList(PTable.CDCChangeScope.CHANGE));
        PRE_POST_IMG = new HashSet<PTable.CDCChangeScope>(Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST));
        ALL_IMG = new HashSet<PTable.CDCChangeScope>(Arrays.asList(PTable.CDCChangeScope.CHANGE, PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST));
    }

    protected abstract class CommitAdapter {
        protected CommitAdapter() {
        }

        abstract void commit(Connection var1) throws SQLException;

        void init() {
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)CDCBaseIT.this.injectEdge);
        }

        public void reset() {
            EnvironmentEdgeManager.reset();
        }

        public Connection getConnection(String tid) throws SQLException {
            return CDCBaseIT.this.newConnection(tid);
        }
    }

    @JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.NONE, setterVisibility=JsonAutoDetect.Visibility.NONE, getterVisibility=JsonAutoDetect.Visibility.NONE, isGetterVisibility=JsonAutoDetect.Visibility.NONE, creatorVisibility=JsonAutoDetect.Visibility.NONE)
    protected class ChangeRow
    implements Comparable<ChangeRow> {
        @JsonProperty
        protected final String tenantId;
        @JsonProperty
        protected final long changeTS;
        @JsonProperty
        protected final Map<String, Object> pks;
        @JsonProperty
        protected final Map<String, Object> change;

        public String getTenantID() {
            return this.tenantId;
        }

        public String getChangeType() {
            return this.change == null ? "delete" : "upsert";
        }

        public long getTimestamp() {
            return this.changeTS;
        }

        ChangeRow(String tenantid, long changeTS, Map<String, Object> pks, Map<String, Object> change) {
            this.tenantId = tenantid;
            this.changeTS = changeTS;
            this.pks = pks;
            this.change = change;
        }

        public String toString() {
            try {
                return mapper.writerFor(ChangeRow.class).writeValueAsString((Object)this);
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public int compareTo(ChangeRow o) {
            if (this.pks.size() != o.pks.size() || !this.pks.keySet().stream().allMatch(k -> o.pks.containsKey(k))) {
                throw new RuntimeException("Incompatible row for comparison: " + this.pks + " vs " + o.pks);
            }
            for (String col : this.pks.keySet()) {
                Object val1 = this.pks.get(col);
                Object val2 = o.pks.get(col);
                int res = val1 instanceof byte[] ? Bytes.compareTo((byte[])((byte[])val1), (byte[])((byte[])val2)) : ((Comparable)val1).compareTo(val2);
                if (res == 0) continue;
                return res;
            }
            return 0;
        }
    }

    public static class PhoenixArraySerializer
    extends StdSerializer<PhoenixArray> {
        protected PhoenixArraySerializer(Class<PhoenixArray> t) {
            super(t);
        }

        public void serialize(PhoenixArray value, JsonGenerator gen, SerializerProvider provider) throws IOException {
            gen.writeStartObject();
            gen.writeStringField("elements", value.toString());
            gen.writeEndObject();
        }
    }

    public static class PartitionMetadata {
        public String partitionId;
        public String parentPartitionId;
        public Long startTime;
        public Long endTime;
        public byte[] startKey;
        public byte[] endKey;
        public Long parentStartTime;

        public PartitionMetadata(ResultSet rs) throws Exception {
            this.partitionId = rs.getString(3);
            this.parentPartitionId = rs.getString(4);
            this.startTime = rs.getLong(5);
            this.endTime = rs.getLong(6);
            this.startKey = rs.getBytes(7);
            this.endKey = rs.getBytes(8);
            this.parentStartTime = rs.getLong(9);
        }
    }
}

