/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={NeedsOwnMiniClusterTest.class})
@RunWith(value=Parameterized.class)
public class TableTTLIT
extends BaseTest {
    private static final Logger LOG = LoggerFactory.getLogger(TableTTLIT.class);
    private static final Random RAND = new Random(11L);
    private static final int MAX_COLUMN_INDEX = 7;
    private static final int MAX_LOOKBACK_AGE = 10;
    private final int ttl;
    private String tableDDLOptions;
    private StringBuilder optionBuilder;
    ManualEnvironmentEdge injectEdge;
    private int versions;
    private final boolean multiCF;
    private final boolean columnEncoded;
    private final KeepDeletedCells keepDeletedCells;
    private final Integer tableLevelMaxLookback;

    public TableTTLIT(boolean multiCF, boolean columnEncoded, KeepDeletedCells keepDeletedCells, int versions, int ttl, Integer tableLevelMaxLookback) {
        this.multiCF = multiCF;
        this.columnEncoded = columnEncoded;
        this.keepDeletedCells = keepDeletedCells;
        this.versions = versions;
        this.ttl = ttl;
        this.tableLevelMaxLookback = tableLevelMaxLookback;
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap props = Maps.newHashMapWithExpectedSize((int)1);
        props.put("phoenix.global.index.row.age.threshold.to.delete.ms", Long.toString(0L));
        props.put("phoenix.max.lookback.age.seconds", Integer.toString(10));
        props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
        props.put("phoenix.view.ttl.enabled", Boolean.toString(false));
        TableTTLIT.setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
    }

    @Before
    public void beforeTest() {
        EnvironmentEdgeManager.reset();
        this.optionBuilder = new StringBuilder();
        this.optionBuilder.append(" TTL=" + this.ttl);
        this.optionBuilder.append(", VERSIONS=" + this.versions);
        if (this.keepDeletedCells == KeepDeletedCells.FALSE) {
            this.optionBuilder.append(", KEEP_DELETED_CELLS=FALSE");
        } else if (this.keepDeletedCells == KeepDeletedCells.TRUE) {
            this.optionBuilder.append(", KEEP_DELETED_CELLS=TRUE");
        } else {
            this.optionBuilder.append(", KEEP_DELETED_CELLS=TTL");
        }
        if (this.columnEncoded) {
            this.optionBuilder.append(", COLUMN_ENCODED_BYTES=2");
        } else {
            this.optionBuilder.append(", COLUMN_ENCODED_BYTES=0");
        }
        this.tableDDLOptions = this.optionBuilder.toString();
        this.injectEdge = new ManualEnvironmentEdge();
        this.injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
    }

    @After
    public synchronized void afterTest() throws Exception {
        boolean refCountLeaked = TableTTLIT.isAnyStoreRefCountLeaked();
        EnvironmentEdgeManager.reset();
        Assert.assertFalse((String)"refCount leaked", (boolean)refCountLeaked);
    }

    @Parameterized.Parameters(name="TableTTLIT_multiCF={0}, columnEncoded={1}, keepDeletedCells={2}, versions={3}, ttl={4}, tableLevelMaxLookback={5}")
    public static synchronized Collection<Object[]> data() {
        return Arrays.asList({false, false, KeepDeletedCells.FALSE, 1, 100, null}, {false, false, KeepDeletedCells.TRUE, 5, 50, null}, {false, false, KeepDeletedCells.TTL, 1, 25, null}, {true, false, KeepDeletedCells.FALSE, 5, 50, null}, {true, false, KeepDeletedCells.TRUE, 1, 25, null}, {true, false, KeepDeletedCells.TTL, 5, 100, null}, {false, false, KeepDeletedCells.FALSE, 1, 100, 0}, {false, false, KeepDeletedCells.TRUE, 5, 50, 0}, {false, false, KeepDeletedCells.TTL, 1, 25, 15});
    }

    @Test
    public void testMaskingAndMajorCompaction() throws Exception {
        int maxLookbackAge = this.tableLevelMaxLookback != null ? this.tableLevelMaxLookback : 10;
        int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
        int maxCompactionCounter = this.ttl / 2;
        int maxFlushCounter = this.ttl;
        int maxMaskingCounter = 2 * this.ttl;
        int maxVerificationCounter = 2 * this.ttl;
        byte[] rowKey = Bytes.toBytes((String)"a");
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
            conn.commit();
            String noCompactTableName = TableTTLIT.generateUniqueName();
            this.createTable(noCompactTableName);
            conn.createStatement().execute("ALTER TABLE " + noCompactTableName + " set \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
            conn.commit();
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            int deleteCounter = RAND.nextInt(maxDeleteCounter) + 1;
            int compactionCounter = RAND.nextInt(maxCompactionCounter) + 1;
            int flushCounter = RAND.nextInt(maxFlushCounter) + 1;
            int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
            int verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
            int maxIterationCount = this.multiCF ? 250 : 500;
            for (int i = 0; i < maxIterationCount; ++i) {
                if (flushCounter-- == 0) {
                    this.injectEdge.incrementValue(1000L);
                    LOG.info("Flush " + i + " current time: " + this.injectEdge.currentTime());
                    this.flush(TableName.valueOf((String)tableName));
                    flushCounter = RAND.nextInt(maxFlushCounter) + 1;
                }
                if (compactionCounter-- == 0) {
                    this.injectEdge.incrementValue(1000L);
                    LOG.info("Compaction " + i + " current time: " + this.injectEdge.currentTime());
                    this.flush(TableName.valueOf((String)tableName));
                    this.majorCompact(TableName.valueOf((String)tableName));
                    compactionCounter = RAND.nextInt(maxCompactionCounter) + 1;
                }
                if (maskingCounter-- == 0) {
                    this.updateRow(conn, tableName, noCompactTableName, "a");
                    this.injectEdge.incrementValue((long)((this.ttl + maxLookbackAge + 1) * 1000));
                    LOG.info("Masking " + i + " current time: " + this.injectEdge.currentTime());
                    ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tableName);
                    Assert.assertTrue((boolean)rs.next());
                    Assert.assertEquals((long)rs.getLong(1), (long)0L);
                    rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + noCompactTableName);
                    Assert.assertTrue((boolean)rs.next());
                    Assert.assertEquals((long)rs.getLong(1), (long)0L);
                    this.flush(TableName.valueOf((String)tableName));
                    this.majorCompact(TableName.valueOf((String)tableName));
                    TestUtil.assertRawCellCount(conn, TableName.valueOf((String)tableName), rowKey, 0);
                    maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
                }
                if (deleteCounter-- == 0) {
                    LOG.info("Delete " + i + " current time: " + this.injectEdge.currentTime());
                    this.deleteRow(conn, tableName, "a");
                    this.deleteRow(conn, noCompactTableName, "a");
                    deleteCounter = RAND.nextInt(maxDeleteCounter) + 1;
                    this.injectEdge.incrementValue(1000L);
                }
                this.injectEdge.incrementValue(1000L);
                this.updateRow(conn, tableName, noCompactTableName, "a");
                if (verificationCounter-- > 0) continue;
                verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
                this.compareRow(conn, tableName, noCompactTableName, "a", 7);
                long scn = this.injectEdge.currentTime() - (long)Math.min(maxLookbackAge, 10) * 1000L;
                long scnEnd = this.injectEdge.currentTime();
                long scnStart = Math.max(scn, startTime);
                for (scn = scnEnd; scn >= scnStart; scn -= 1000L) {
                    Properties props = new Properties();
                    props.setProperty("CurrentSCN", Long.toString(scn));
                    try (Connection scnConn = DriverManager.getConnection(url, props);){
                        this.compareRow(scnConn, tableName, noCompactTableName, "a", 7);
                        continue;
                    }
                }
            }
        }
    }

    @Test
    public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled() throws Exception {
        if (this.tableLevelMaxLookback == null || this.tableLevelMaxLookback != 0) {
            return;
        }
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + this.tableLevelMaxLookback);
            int flushCount = 10;
            byte[] row = Bytes.toBytes((String)"a");
            int rowUpdateCounter = 0;
            try {
                for (int i = 0; i < 10; ++i) {
                    int updateCount = RAND.nextInt(10) + this.versions;
                    LOG.info(String.format("Iteration:%d uc:%d cntr:%d", i, updateCount, rowUpdateCounter += updateCount));
                    for (int j = 0; j < updateCount; ++j) {
                        this.updateRow(conn, tableName, "a");
                        Thread.sleep(1L);
                    }
                    this.flush(TableName.valueOf((String)tableName));
                    Assert.assertEquals((long)TestUtil.getRawCellCount(conn, TableName.valueOf((String)tableName), row), (long)(rowUpdateCounter * 8));
                }
                TestUtil.minorCompact(utility, TableName.valueOf((String)tableName));
                Assert.assertEquals((long)TestUtil.getRawCellCount(conn, TableName.valueOf((String)tableName), Bytes.toBytes((String)"a")), (long)(8 * this.versions));
            }
            catch (AssertionError e) {
                TestUtil.dumpTable(conn, TableName.valueOf((String)tableName));
                throw e;
            }
        }
    }

    @Test
    public void testRowSpansMultipleTTLWindows() throws Exception {
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            String noCompactTableName = TableTTLIT.generateUniqueName();
            this.createTable(noCompactTableName);
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            for (int columnIndex = 1; columnIndex <= 7; ++columnIndex) {
                String value = Integer.toString(RAND.nextInt(1000));
                this.updateColumn(conn, tableName, "a", columnIndex, value);
                this.updateColumn(conn, noCompactTableName, "a", columnIndex, value);
                conn.commit();
                this.injectEdge.incrementValue((long)(this.ttl * 1000 - 1000));
            }
            this.flush(TableName.valueOf((String)tableName));
            this.majorCompact(TableName.valueOf((String)tableName));
            this.compareRow(conn, tableName, noCompactTableName, "a", 7);
            this.injectEdge.incrementValue(1000L);
        }
    }

    @Test
    public void testRowSpansMultipleTTLWindowsWithCdc() throws Exception {
        int maxLookbackAge = this.tableLevelMaxLookback != null ? this.tableLevelMaxLookback : 10;
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String schemaName = TableTTLIT.generateUniqueName();
            String tableName = schemaName + "." + TableTTLIT.generateUniqueName();
            String noCompactTableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            this.createTable(noCompactTableName);
            conn.createStatement().execute("ALTER TABLE " + tableName + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
            String cdcName = TableTTLIT.generateUniqueName();
            String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (PRE, POST)";
            conn.createStatement().execute(cdcSql);
            conn.commit();
            String cdcFullName = SchemaUtil.getTableName(null, (String)(schemaName + "." + cdcName));
            ObjectMapper mapper = new ObjectMapper();
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            Map lastPostImage = null;
            for (int columnIndex = 1; columnIndex <= 7; ++columnIndex) {
                String value = Integer.toString(RAND.nextInt(1000));
                this.updateColumn(conn, tableName, "a", columnIndex, value);
                this.updateColumn(conn, noCompactTableName, "a", columnIndex, value);
                conn.commit();
                String cdcQuery = "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" FROM " + cdcFullName + " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 1";
                try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery);){
                    Map cdcEvent;
                    if (rs.next() && (cdcEvent = (Map)mapper.readValue(rs.getString(2), HashMap.class)).containsKey("post_image")) {
                        lastPostImage = (Map)cdcEvent.get("post_image");
                    }
                }
                this.injectEdge.incrementValue((long)(this.ttl * 1000 - 1000));
            }
            Assert.assertNotNull((String)"Last post-image should not be null", lastPostImage);
            this.injectEdge.incrementValue((long)((this.ttl + maxLookbackAge + 1) * 1000));
            this.flush(TableName.valueOf((String)tableName));
            this.majorCompact(TableName.valueOf((String)tableName));
            String dataQuery = "SELECT * FROM " + tableName + " WHERE id = 'a'";
            try (ResultSet rs = conn.createStatement().executeQuery(dataQuery);){
                Assert.assertFalse((String)"Row should be expired from data table", (boolean)rs.next());
            }
            String cdcQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName + " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 1";
            try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery);){
                Assert.assertTrue((String)"Should find TTL delete event", (boolean)rs.next());
                Map ttlDeleteEvent = (Map)mapper.readValue(rs.getString(1), HashMap.class);
                LOG.info("TTL delete event: {}", (Object)ttlDeleteEvent);
                Assert.assertEquals((String)"Should be ttl_delete event", (Object)"ttl_delete", ttlDeleteEvent.get("event_type"));
                Map ttlPreImage = (Map)ttlDeleteEvent.get("pre_image");
                Assert.assertNotNull((String)"TTL pre-image should not be null", (Object)ttlPreImage);
                Assert.assertEquals((String)"TTL delete pre-image should match last post-image from normal CDC events", (Object)lastPostImage, (Object)ttlPreImage);
                Assert.assertFalse((String)"No more event should be found", (boolean)rs.next());
            }
            this.compareRow(conn, tableName, noCompactTableName, "a", 7);
            this.injectEdge.incrementValue(1000L);
        }
    }

    @Test
    public void testMultipleRowsWithUpdatesMoreThanTTLApart() throws Exception {
        if (this.tableLevelMaxLookback == null || this.tableLevelMaxLookback != 0) {
            return;
        }
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + this.tableLevelMaxLookback);
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            this.updateRow(conn, tableName, "a1");
            this.updateRow(conn, tableName, "a2");
            this.updateRow(conn, tableName, "a3");
            this.injectEdge.incrementValue((long)((this.ttl + 1) * 1000));
            this.updateColumn(conn, tableName, "a1", 2, "col2");
            this.updateColumn(conn, tableName, "a2", 3, "col3");
            this.updateColumn(conn, tableName, "a3", 5, "col5");
            conn.commit();
            String dql = "SELECT * from " + tableName;
            ResultSet rs = conn.createStatement().executeQuery(dql);
            while (rs.next()) {
                String id = rs.getString(1);
                int updatedColIndex = 0;
                if (id.equals("a1")) {
                    updatedColIndex = 2;
                } else if (id.equals("a2")) {
                    updatedColIndex = 3;
                } else if (id.equals("a3")) {
                    updatedColIndex = 5;
                } else {
                    Assert.fail((String)String.format("Got unexpected row key %s", id));
                }
                for (int colIndex = 1; colIndex <= 7; ++colIndex) {
                    if (colIndex != updatedColIndex) {
                        Assert.assertNull((Object)rs.getString(colIndex + 1));
                        continue;
                    }
                    Assert.assertNotNull((Object)rs.getString(colIndex + 1));
                }
            }
            this.flush(TableName.valueOf((String)tableName));
            this.majorCompact(TableName.valueOf((String)tableName));
            dql = "SELECT count(*) from " + tableName;
            rs = conn.createStatement().executeQuery(dql);
            Assert.assertTrue((boolean)rs.next());
            Assert.assertEquals((long)3L, (long)rs.getInt(1));
        }
    }

    @Test
    public void testMaskingGapAnalysis() throws Exception {
        if (this.tableLevelMaxLookback == null || this.tableLevelMaxLookback != 0) {
            return;
        }
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + this.tableLevelMaxLookback);
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            this.updateRow(conn, tableName, "a1");
            conn.commit();
            String dql = String.format("SELECT * from %s where id='%s'", tableName, "a1");
            String[] colValues = new String[8];
            try (ResultSet rs = conn.createStatement().executeQuery(dql);){
                Assert.assertTrue((boolean)rs.next());
                for (int i = 1; i <= 7; ++i) {
                    colValues[i] = rs.getString("val" + i);
                }
            }
            for (int iter = 1; iter <= 20; ++iter) {
                this.injectEdge.incrementValue(1L);
                int colIndex = RAND.nextInt(7) + 1;
                String value = Integer.toString(RAND.nextInt(1000));
                this.updateColumn(conn, tableName, "a1", colIndex, value);
                conn.commit();
                colValues[colIndex] = value;
                this.injectEdge.incrementValue((long)(this.ttl * 1000));
                colIndex = RAND.nextInt(7) + 1;
                value = Integer.toString(RAND.nextInt(1000));
                this.updateColumn(conn, tableName, "a1", colIndex, value);
                conn.commit();
                colValues[colIndex] = value;
                if (iter % 5 != 0) continue;
                try (ResultSet rs = conn.createStatement().executeQuery(dql);){
                    Assert.assertTrue((boolean)rs.next());
                    for (int i = 1; i <= 7; ++i) {
                        Assert.assertEquals((Object)colValues[i], (Object)rs.getString("val" + i));
                    }
                }
                this.injectEdge.incrementValue((long)(this.ttl * 1000 + RAND.nextInt(this.ttl * 1000)));
                colValues = new String[8];
            }
        }
    }

    @Test
    public void testMultipleUpdatesToSingleColumn() throws Exception {
        if (this.tableLevelMaxLookback == null || this.tableLevelMaxLookback != 0) {
            return;
        }
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + this.tableLevelMaxLookback);
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            this.updateRow(conn, tableName, "a1");
            this.injectEdge.incrementValue(1L);
            for (int i = 0; i < 15; ++i) {
                this.updateColumn(conn, tableName, "a1", 2, "col2_" + i);
                conn.commit();
                this.injectEdge.incrementValue((long)(this.ttl / 10 * 1000));
            }
            conn.commit();
            String dql = "select * from " + tableName + " where id='a1'";
            try (ResultSet rs = conn.createStatement().executeQuery(dql);){
                while (rs.next()) {
                    for (int col = 1; col <= 8; ++col) {
                        System.out.println(rs.getString(col));
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteFamilyVersion() throws Exception {
        if (this.tableLevelMaxLookback == null || this.tableLevelMaxLookback != 0) {
            return;
        }
        if (this.multiCF) {
            return;
        }
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String expectedValue;
            String indexColumnValue;
            String explainPlan;
            PhoenixResultSet prs;
            String tableName = "T_" + TableTTLIT.generateUniqueName();
            this.createTable(tableName);
            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + this.tableLevelMaxLookback);
            String indexName = "I_" + TableTTLIT.generateUniqueName();
            String indexDDL = String.format("create index %s on %s (val1) include (val2, val3) ", indexName, tableName);
            conn.createStatement().execute(indexDDL);
            this.updateRow(conn, tableName, "a1");
            String dql = "select val1, val2 from " + tableName + " where id = 'a1'";
            try (ResultSet rs2 = conn.createStatement().executeQuery(dql);){
                prs = rs2.unwrap(PhoenixResultSet.class);
                explainPlan = QueryUtil.getExplainPlan((ResultIterator)prs.getUnderlyingIterator());
                Assert.assertFalse((boolean)explainPlan.contains(indexName));
                Assert.assertTrue((boolean)rs2.next());
                indexColumnValue = rs2.getString(1);
                expectedValue = rs2.getString(2);
                Assert.assertFalse((boolean)rs2.next());
            }
            IndexRegionObserver.setFailDataTableUpdatesForTesting((boolean)true);
            try {
                this.updateColumn(conn, tableName, "a1", 2, "col2_xyz");
                conn.commit();
                Assert.fail((String)"An exception should have been thrown");
            }
            catch (Exception rs2) {
            }
            finally {
                IndexRegionObserver.setFailDataTableUpdatesForTesting((boolean)false);
            }
            IndexRegionObserver.setFailDataTableUpdatesForTesting((boolean)true);
            try {
                this.updateColumn(conn, tableName, "a1", 2, "col2_abc");
                conn.commit();
                Assert.fail((String)"An exception should have been thrown");
            }
            catch (Exception rs2) {
            }
            finally {
                IndexRegionObserver.setFailDataTableUpdatesForTesting((boolean)false);
            }
            TestUtil.dumpTable(conn, TableName.valueOf((String)indexName));
            dql = "select val2 from " + tableName + " where val1 = '" + indexColumnValue + "'";
            rs2 = conn.createStatement().executeQuery(dql);
            try {
                prs = rs2.unwrap(PhoenixResultSet.class);
                explainPlan = QueryUtil.getExplainPlan((ResultIterator)prs.getUnderlyingIterator());
                Assert.assertTrue((boolean)explainPlan.contains(indexName));
                Assert.assertTrue((boolean)rs2.next());
                Assert.assertEquals((Object)rs2.getString(1), (Object)expectedValue);
                Assert.assertFalse((boolean)rs2.next());
            }
            finally {
                if (rs2 != null) {
                    rs2.close();
                }
            }
            TestUtil.dumpTable(conn, TableName.valueOf((String)indexName));
            this.flush(TableName.valueOf((String)indexName));
            this.majorCompact(TableName.valueOf((String)indexName));
            TestUtil.dumpTable(conn, TableName.valueOf((String)indexName));
            rs2 = conn.createStatement().executeQuery(dql);
            try {
                prs = rs2.unwrap(PhoenixResultSet.class);
                explainPlan = QueryUtil.getExplainPlan((ResultIterator)prs.getUnderlyingIterator());
                Assert.assertTrue((boolean)explainPlan.contains(indexName));
                Assert.assertTrue((boolean)rs2.next());
                Assert.assertEquals((Object)rs2.getString(1), (Object)expectedValue);
                Assert.assertFalse((boolean)rs2.next());
            }
            finally {
                if (rs2 != null) {
                    rs2.close();
                }
            }
        }
    }

    @Test
    public void testCDCTTLExpiredRows() throws Exception {
        int maxLookbackAge = this.tableLevelMaxLookback != null ? this.tableLevelMaxLookback : 10;
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            Map preImage;
            Map postImage;
            Map cdcEvent;
            String schemaName = TableTTLIT.generateUniqueName();
            String tableName = schemaName + "." + TableTTLIT.generateUniqueName();
            String cdcName = TableTTLIT.generateUniqueName();
            ObjectMapper mapper = new ObjectMapper();
            this.createTable(tableName);
            conn.createStatement().execute("ALTER TABLE " + tableName + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
            String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (PRE, POST)";
            conn.createStatement().execute(cdcSql);
            conn.commit();
            String cdcIndexName = schemaName + "." + CDCUtil.getCDCIndexName((String)(schemaName + "." + cdcName));
            String cdcFullName = SchemaUtil.getTableName(null, (String)(schemaName + "." + cdcName));
            PTable cdcIndex = ((PhoenixConnection)conn).getTableNoCache(cdcIndexName);
            Assert.assertNotNull((String)"CDC index should be created", (Object)cdcIndex);
            Assert.assertTrue((String)"CDC index should be CDC type", (boolean)CDCUtil.isCDCIndex((PTable)cdcIndex));
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            this.updateRow(conn, tableName, "row1");
            long insertTime = this.injectEdge.currentTime();
            this.injectEdge.incrementValue(1000L);
            this.updateColumn(conn, tableName, "row1", 1, "updated_val1");
            this.updateColumn(conn, tableName, "row1", 2, "updated_val2");
            conn.commit();
            long updateTime = this.injectEdge.currentTime();
            this.injectEdge.incrementValue(1000L);
            String cdcQuery = "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" FROM " + cdcFullName;
            try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery);){
                Assert.assertTrue((String)"Should have insert CDC event", (boolean)rs.next());
                long eventTimestamp = rs.getTimestamp(1).getTime();
                Assert.assertTrue((String)"Insert event timestamp should be close to insert time", (Math.abs(eventTimestamp - insertTime) < 2000L ? 1 : 0) != 0);
                cdcEvent = (Map)mapper.readValue(rs.getString(2), HashMap.class);
                Assert.assertEquals((String)"Should be upsert event", (Object)"upsert", cdcEvent.get("event_type"));
                Assert.assertTrue((String)"Should have post-image", (boolean)cdcEvent.containsKey("post_image"));
                postImage = (Map)cdcEvent.get("post_image");
                Assert.assertFalse((String)"post image must contain something", (boolean)postImage.isEmpty());
                Assert.assertTrue((String)"Should have update CDC event", (boolean)rs.next());
                eventTimestamp = rs.getTimestamp(1).getTime();
                Assert.assertTrue((String)"Update event timestamp should be close to update time", (Math.abs(eventTimestamp - updateTime) < 2000L ? 1 : 0) != 0);
                cdcEvent = (Map)mapper.readValue(rs.getString(2), HashMap.class);
                Assert.assertEquals((String)"Should be upsert event", (Object)"upsert", cdcEvent.get("event_type"));
                Assert.assertTrue((String)"Should have pre-image", (boolean)cdcEvent.containsKey("pre_image"));
                Assert.assertTrue((String)"Should have post-image", (boolean)cdcEvent.containsKey("post_image"));
                preImage = (Map)cdcEvent.get("pre_image");
                Assert.assertEquals((String)"Comparison of last post-image with new pre-image", (Object)postImage, (Object)preImage);
                postImage = (Map)cdcEvent.get("post_image");
                LOG.info("Post-image {}", (Object)postImage);
            }
            this.injectEdge.incrementValue((long)((this.ttl + maxLookbackAge + 1) * 1000));
            TestUtil.dumpTable(conn, TableName.valueOf((String)tableName));
            TestUtil.dumpTable(conn, TableName.valueOf((String)cdcIndexName));
            this.flush(TableName.valueOf((String)tableName));
            this.majorCompact(TableName.valueOf((String)tableName));
            TestUtil.dumpTable(conn, TableName.valueOf((String)tableName));
            TestUtil.dumpTable(conn, TableName.valueOf((String)cdcIndexName));
            String dataQuery = "SELECT * FROM " + tableName + " WHERE id = 'row1'";
            try (ResultSet rs = conn.createStatement().executeQuery(dataQuery);){
                Assert.assertFalse((String)"Row should be expired from data table", (boolean)rs.next());
            }
            rs = conn.createStatement().executeQuery(cdcQuery);
            try {
                int eventCount = 0;
                Map ttlDeleteEvent = null;
                while (rs.next()) {
                    ++eventCount;
                    Map cdcEvent2 = (Map)mapper.readValue(rs.getString(2), HashMap.class);
                    String eventType = (String)cdcEvent2.get("event_type");
                    Assert.assertEquals((String)("Event type must be ttl_delete but found " + eventType), (Object)"ttl_delete", (Object)eventType);
                    if (!"ttl_delete".equals(eventType)) continue;
                    ttlDeleteEvent = cdcEvent2;
                }
                Assert.assertEquals((String)"Should have only 1 event for TTL_DELETE because other events are expired due to major compaction", (long)1L, (long)eventCount);
                Assert.assertNotNull((String)"Should have TTL delete event", ttlDeleteEvent);
                Assert.assertEquals((String)"Should be ttl_delete event", (Object)"ttl_delete", ttlDeleteEvent.get("event_type"));
                Assert.assertTrue((String)"TTL delete should have pre-image", (boolean)ttlDeleteEvent.containsKey("pre_image"));
                preImage = (Map)ttlDeleteEvent.get("pre_image");
                Assert.assertEquals((String)"Comparison of last post-image with new pre-image", (Object)postImage, (Object)preImage);
                LOG.info("TTL delete event verified: {}", (Object)ttlDeleteEvent);
            }
            finally {
                if (rs != null) {
                    rs.close();
                }
            }
            String cdcScanQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName + " WHERE \"CDC JSON\" LIKE '%ttl_delete%'";
            try (ResultSet rs = conn.createStatement().executeQuery(cdcScanQuery);){
                Assert.assertTrue((String)"Should find TTL delete event via scan", (boolean)rs.next());
                cdcEvent = (Map)mapper.readValue(rs.getString(1), HashMap.class);
                Assert.assertEquals((String)"Should be ttl_delete event", (Object)"ttl_delete", cdcEvent.get("event_type"));
            }
            LOG.info("CDC TTL test completed successfully for table: {}", (Object)tableName);
        }
    }

    private void flush(TableName table) throws IOException {
        Admin admin = TableTTLIT.getUtility().getAdmin();
        admin.flush(table);
    }

    private void majorCompact(TableName table) throws Exception {
        TestUtil.majorCompact(TableTTLIT.getUtility(), table);
    }

    @Test
    public void testCDCBatchMutationsForTTLExpiredRows() throws Exception {
        int maxLookbackAge = this.tableLevelMaxLookback != null ? this.tableLevelMaxLookback : 10;
        int numRows = 182;
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String tableName = TableTTLIT.generateUniqueName();
            String cdcName = TableTTLIT.generateUniqueName();
            ObjectMapper mapper = new ObjectMapper();
            this.createTable(tableName);
            conn.createStatement().execute("ALTER TABLE " + tableName + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
            String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (PRE, POST)";
            conn.createStatement().execute(cdcSql);
            conn.commit();
            String cdcFullName = SchemaUtil.getTableName(null, (String)cdcName);
            long startTime = System.currentTimeMillis() + 1000L;
            startTime = startTime / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            this.injectEdge.setValue(startTime);
            HashMap<String, Map> lastPostImages = new HashMap<String, Map>();
            for (int i = 1; i <= 182; ++i) {
                String rowId = "row" + i;
                this.updateRow(conn, tableName, rowId);
                this.injectEdge.incrementValue(100L);
            }
            String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName;
            try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery);){
                while (rs.next()) {
                    Map cdcEvent = (Map)mapper.readValue(rs.getString(3), HashMap.class);
                    Assert.assertEquals((String)"Should be upsert event", (Object)"upsert", cdcEvent.get("event_type"));
                    Map postImage = (Map)cdcEvent.get("post_image");
                    String rowId = rs.getString(2);
                    lastPostImages.put(rowId, postImage);
                }
            }
            Assert.assertEquals((String)"Should have captured post-images for all 182 rows", (long)182L, (long)lastPostImages.size());
            this.injectEdge.incrementValue((long)((this.ttl + maxLookbackAge + 1) * 1000));
            EnvironmentEdgeManager.reset();
            this.flush(TableName.valueOf((String)tableName));
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)this.injectEdge);
            Timestamp ts = new Timestamp(this.injectEdge.currentTime());
            this.majorCompact(TableName.valueOf((String)tableName));
            String dataQuery = "SELECT COUNT(*) FROM " + tableName;
            try (ResultSet rs = conn.createStatement().executeQuery(dataQuery);){
                Assert.assertTrue((String)"Should have count result", (boolean)rs.next());
                Assert.assertEquals((String)"All rows should be expired from data table", (long)0L, (long)rs.getInt(1));
            }
            String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?";
            HashMap<String, Map> ttlDeletePreImages = new HashMap<String, Map>();
            int ttlDeleteEventCount = 0;
            try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery);){
                pst.setTimestamp(1, ts);
                try (ResultSet rs = pst.executeQuery(ttlDeleteQuery);){
                    while (rs.next()) {
                        ++ttlDeleteEventCount;
                        Map cdcEvent = (Map)mapper.readValue(rs.getString(3), HashMap.class);
                        Assert.assertEquals((String)"Should be ttl_delete event", (Object)"ttl_delete", cdcEvent.get("event_type"));
                        Assert.assertTrue((String)"TTL delete should have pre-image", (boolean)cdcEvent.containsKey("pre_image"));
                        Map preImage = (Map)cdcEvent.get("pre_image");
                        Assert.assertNotNull((String)"Pre-image should not be null", (Object)preImage);
                        Assert.assertFalse((String)"Pre-image should not be empty", (boolean)preImage.isEmpty());
                        String rowId = rs.getString(2);
                        ttlDeletePreImages.put(rowId, preImage);
                    }
                }
            }
            Assert.assertEquals((String)"Should have exactly 182 TTL_DELETE events", (long)182L, (long)ttlDeleteEventCount);
            Assert.assertEquals((String)"Should have pre-images for all 182 rows", (long)182L, (long)ttlDeletePreImages.size());
            for (String rowId : lastPostImages.keySet()) {
                Assert.assertTrue((String)("Should have TTL_DELETE pre-image for row " + rowId), (boolean)ttlDeletePreImages.containsKey(rowId));
                Map lastPostImage = (Map)lastPostImages.get(rowId);
                Map ttlDeletePreImage = (Map)ttlDeletePreImages.get(rowId);
                Assert.assertEquals((String)("Pre-image in TTL_DELETE should match last post-image from UPSERT for row " + rowId), (Object)lastPostImage, (Object)ttlDeletePreImage);
            }
        }
    }

    private void deleteRow(Connection conn, String tableName, String id) throws SQLException {
        String dml = "DELETE from " + tableName + " WHERE id = '" + id + "'";
        conn.createStatement().executeUpdate(dml);
        conn.commit();
    }

    private void updateColumn(Connection conn, String dataTableName, String id, int columnIndex, String value) throws SQLException {
        String upsertSql = value == null ? String.format("UPSERT INTO %s (id, %s) VALUES ('%s', null)", dataTableName, "val" + columnIndex, id) : String.format("UPSERT INTO %s (id, %s) VALUES ('%s', '%s')", dataTableName, "val" + columnIndex, id, value);
        conn.createStatement().execute(upsertSql);
    }

    private void updateRow(Connection conn, String tableName1, String tableName2, String id) throws SQLException {
        int columnCount = RAND.nextInt(7) + 1;
        for (int i = 0; i < columnCount; ++i) {
            int columnIndex = RAND.nextInt(7) + 1;
            String value = null;
            if (RAND.nextInt(7) > 0) {
                value = Integer.toString(RAND.nextInt(1000));
            }
            this.updateColumn(conn, tableName1, id, columnIndex, value);
            this.updateColumn(conn, tableName2, id, columnIndex, value);
        }
        conn.commit();
    }

    private void updateRow(Connection conn, String tableName, String id) throws SQLException {
        for (int i = 1; i <= 7; ++i) {
            String value = Integer.toString(RAND.nextInt(1000));
            this.updateColumn(conn, tableName, id, i, value);
        }
        conn.commit();
    }

    private void compareRow(Connection conn, String tableName1, String tableName2, String id, int maxColumnIndex) throws SQLException, IOException {
        StringBuilder queryBuilder = new StringBuilder("SELECT ");
        for (int i = 1; i < maxColumnIndex; ++i) {
            queryBuilder.append("val" + i + ", ");
        }
        queryBuilder.append("val" + maxColumnIndex + " FROM %s ");
        queryBuilder.append("where id='" + id + "'");
        ResultSet rs1 = conn.createStatement().executeQuery(String.format(queryBuilder.toString(), tableName1));
        ResultSet rs2 = conn.createStatement().executeQuery(String.format(queryBuilder.toString(), tableName2));
        boolean hasRow1 = rs1.next();
        boolean hasRow2 = rs2.next();
        Assert.assertEquals((Object)hasRow1, (Object)hasRow2);
        if (hasRow1) {
            for (int i = 1; i <= maxColumnIndex; ++i) {
                if (rs1.getString(i) != null) {
                    if (!rs1.getString(i).equals(rs2.getString(i))) {
                        LOG.debug("VAL" + i + " " + rs2.getString(i) + " : " + rs1.getString(i));
                    }
                } else if (rs2.getString(i) != null) {
                    LOG.debug("VAL" + i + " " + rs2.getString(i) + " : " + rs1.getString(i));
                }
                Assert.assertEquals((String)("VAL" + i), (Object)rs2.getString(i), (Object)rs1.getString(i));
            }
        }
    }

    private void createTable(String tableName) throws SQLException {
        try (Connection conn = DriverManager.getConnection(TableTTLIT.getUrl());){
            String createSql = this.multiCF ? "create table " + tableName + " (id varchar not null primary key, val1 varchar, a.val2 varchar, a.val3 varchar, a.val4 varchar, b.val5 varchar, a.val6 varchar, b.val7 varchar) " + this.tableDDLOptions : "create table " + tableName + " (id varchar not null primary key, val1 varchar, val2 varchar, val3 varchar, val4 varchar, val5 varchar, val6 varchar, val7 varchar) " + this.tableDDLOptions;
            LOG.debug(String.format("Creating table %s, %s", tableName, createSql));
            conn.createStatement().execute(createSql);
            conn.commit();
        }
    }
}

