/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
import org.apache.phoenix.end2end.CDCBaseIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={NeedsOwnMiniClusterTest.class})
public class CDCStreamIT
extends CDCBaseIT {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final MetricsPhoenixMasterSource METRICS_SOURCE = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap props = Maps.newHashMapWithExpectedSize((int)1);
        props.put("phoenix.max.lookback.age.seconds", Integer.toString(3600));
        props.put("phoenix.use.stats.parallelization", Boolean.toString(false));
        props.put("phoenix.task.handling.interval.ms", Long.toString(Long.MAX_VALUE));
        props.put("phoenix.task.handling.initial.delay.ms", Long.toString(Long.MAX_VALUE));
        props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName());
        CDCStreamIT.setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
        taskRegionEnvironment = (RegionCoprocessorEnvironment)((HRegion)CDCStreamIT.getUtility().getRSForFirstRegionInTable(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME).getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME).get(0)).getCoprocessorHost().findCoprocessorEnvironment(TaskRegionObserver.class.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCdcWithMaxLookbackAndCompaction() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        String cdcName = CDCStreamIT.generateUniqueName();
        String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
        conn.createStatement().execute("CREATE TABLE  " + tableName + " ( k VARCHAR NOT NULL, v1 VARCHAR, v2 BIGINT CONSTRAINT PK PRIMARY KEY(k)) \"phoenix.max.lookback.age.seconds\"=97200,TTL='TO_NUMBER(CURRENT_TIME()) > v2', IS_STRICT_TTL=false");
        this.createCDC(conn, cdcDdl, null);
        long expiry = (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3L)) / 1000L;
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a', 'aa', " + expiry + ")");
        conn.commit();
        String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName;
        CDCStreamIT.verifyOnlyPostImage(conn, sql, expiry);
        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
        long t = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3L);
        t = (t / 1000L + 100L) * 1000L;
        EnvironmentEdgeManager.injectEdge((EnvironmentEdge)injectEdge);
        try {
            injectEdge.setValue(t);
            conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a', 'bb', " + expiry + ")");
            conn.commit();
            CDCStreamIT.verifyPreAndPostImages(conn, sql, expiry);
            injectEdge.incrementValue(TimeUnit.DAYS.toMillis(1L));
            EnvironmentEdgeManager.reset();
            TestUtil.flush(CDCStreamIT.getUtility(), TableName.valueOf((String)tableName));
            TestUtil.flush(CDCStreamIT.getUtility(), TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)injectEdge);
            TestUtil.majorCompact(CDCStreamIT.getUtility(), TableName.valueOf((String)tableName));
            TestUtil.majorCompact(CDCStreamIT.getUtility(), TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            CDCStreamIT.verifyImagesAfterFirstCompaction(conn, sql, expiry);
            injectEdge.incrementValue(TimeUnit.HOURS.toMillis(3L) + TimeUnit.SECONDS.toMillis(1L));
            EnvironmentEdgeManager.reset();
            TestUtil.flush(CDCStreamIT.getUtility(), TableName.valueOf((String)tableName));
            TestUtil.flush(CDCStreamIT.getUtility(), TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)injectEdge);
            TestUtil.majorCompact(CDCStreamIT.getUtility(), TableName.valueOf((String)tableName));
            TestUtil.majorCompact(CDCStreamIT.getUtility(), TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            CDCStreamIT.verifyTtlExpiredPreImage(conn, sql, expiry);
        }
        finally {
            EnvironmentEdgeManager.reset();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCdcWithMaxLookbackAlterTable() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        String cdcName = CDCStreamIT.generateUniqueName();
        String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
        conn.createStatement().execute("CREATE TABLE  " + tableName + " ( k VARCHAR NOT NULL, v1 VARCHAR, v2 BIGINT CONSTRAINT PK PRIMARY KEY(k)) \"phoenix.max.lookback.age.seconds\"=97200,TTL='TO_NUMBER(CURRENT_TIME()) > v2', IS_STRICT_TTL=false");
        this.createCDC(conn, cdcDdl, null);
        long expiry = (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3L)) / 1000L;
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a', 'aa', " + expiry + ")");
        conn.commit();
        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
        TableDescriptor tableDescriptor = admin.getDescriptor(TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
        String maxLookbackVal = tableDescriptor.getValue("phoenix.max.lookback.age.seconds");
        Assert.assertEquals((long)97200L, (long)Integer.parseInt(maxLookbackVal));
        tableDescriptor = admin.getDescriptor(TableName.valueOf((String)tableName));
        maxLookbackVal = tableDescriptor.getValue("phoenix.max.lookback.age.seconds");
        Assert.assertEquals((long)97200L, (long)Integer.parseInt(maxLookbackVal));
        String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName;
        CDCStreamIT.verifyOnlyPostImage(conn, sql, expiry);
        conn.createStatement().execute("ALTER TABLE " + tableName + " SET \"phoenix.max.lookback.age.seconds\"=50");
        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
        long t = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3L);
        t = (t / 1000L + 100L) * 1000L;
        EnvironmentEdgeManager.injectEdge((EnvironmentEdge)injectEdge);
        try {
            injectEdge.setValue(t);
            conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a', 'bb', " + expiry + ")");
            conn.commit();
            CDCStreamIT.verifyPreAndPostImages(conn, sql, expiry);
            injectEdge.incrementValue(TimeUnit.DAYS.toMillis(1L));
            EnvironmentEdgeManager.reset();
            TestUtil.flush(CDCStreamIT.getUtility(), TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)injectEdge);
            TestUtil.majorCompact(CDCStreamIT.getUtility(), TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            ResultSet rs = conn.createStatement().executeQuery(sql);
            Assert.assertFalse((boolean)rs.next());
            tableDescriptor = admin.getDescriptor(TableName.valueOf((String)CDCUtil.getCDCIndexName((String)cdcName)));
            maxLookbackVal = tableDescriptor.getValue("phoenix.max.lookback.age.seconds");
            Assert.assertEquals((long)50L, (long)Integer.parseInt(maxLookbackVal));
            tableDescriptor = admin.getDescriptor(TableName.valueOf((String)tableName));
            maxLookbackVal = tableDescriptor.getValue("phoenix.max.lookback.age.seconds");
            Assert.assertEquals((long)50L, (long)Integer.parseInt(maxLookbackVal));
        }
        finally {
            EnvironmentEdgeManager.reset();
        }
    }

    private static void verifyPreAndPostImages(Connection conn, String sql, long expiry) throws SQLException, JsonProcessingException {
        ResultSet rs = conn.createStatement().executeQuery(sql);
        Assert.assertTrue((boolean)rs.next());
        String cdcJson = rs.getString(3);
        Map map = (Map)OBJECT_MAPPER.readValue(cdcJson, Map.class);
        Assert.assertTrue((boolean)((Map)map.get("pre_image")).isEmpty());
        Assert.assertEquals((Object)"upsert", map.get("event_type"));
        Map postImage = (Map)map.get("post_image");
        Assert.assertEquals((Object)"aa", postImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), postImage.get("V2"));
        Assert.assertTrue((boolean)rs.next());
        cdcJson = rs.getString(3);
        map = (Map)OBJECT_MAPPER.readValue(cdcJson, Map.class);
        Assert.assertEquals((Object)"upsert", map.get("event_type"));
        Map preImage = (Map)map.get("pre_image");
        Assert.assertEquals((Object)"aa", preImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), preImage.get("V2"));
        postImage = (Map)map.get("post_image");
        Assert.assertEquals((Object)"bb", postImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), postImage.get("V2"));
        Assert.assertFalse((boolean)rs.next());
    }

    private static void verifyImagesAfterFirstCompaction(Connection conn, String sql, long expiry) throws SQLException, JsonProcessingException {
        ResultSet rs = conn.createStatement().executeQuery(sql);
        Assert.assertTrue((boolean)rs.next());
        String cdcJson = rs.getString(3);
        Map map = (Map)OBJECT_MAPPER.readValue(cdcJson, Map.class);
        Assert.assertEquals((Object)"upsert", map.get("event_type"));
        Map preImage = (Map)map.get("pre_image");
        Assert.assertEquals((Object)"aa", preImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), preImage.get("V2"));
        Map postImage = (Map)map.get("post_image");
        Assert.assertEquals((Object)"bb", postImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), postImage.get("V2"));
        Assert.assertFalse((boolean)rs.next());
    }

    private static void verifyOnlyPostImage(Connection conn, String sql, long expiry) throws SQLException, JsonProcessingException {
        ResultSet rs = conn.createStatement().executeQuery(sql);
        Assert.assertTrue((boolean)rs.next());
        String cdcJson = rs.getString(3);
        Map map = (Map)OBJECT_MAPPER.readValue(cdcJson, Map.class);
        Assert.assertTrue((boolean)((Map)map.get("pre_image")).isEmpty());
        Assert.assertEquals((Object)"upsert", map.get("event_type"));
        Map postImage = (Map)map.get("post_image");
        Assert.assertEquals((Object)"aa", postImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), postImage.get("V2"));
        Assert.assertFalse((boolean)rs.next());
    }

    private static void verifyTtlExpiredPreImage(Connection conn, String sql, long expiry) throws SQLException, JsonProcessingException {
        ResultSet rs = conn.createStatement().executeQuery(sql);
        Assert.assertTrue((boolean)rs.next());
        String cdcJson = rs.getString(3);
        Map map = (Map)OBJECT_MAPPER.readValue(cdcJson, Map.class);
        Assert.assertEquals((Object)"ttl_delete", map.get("event_type"));
        Map preImage = (Map)map.get("pre_image");
        Assert.assertEquals((Object)"bb", preImage.get("V1"));
        Assert.assertEquals((Object)new Long(expiry).intValue(), preImage.get("V2"));
        Map postImage = (Map)map.get("post_image");
        Assert.assertNull((Object)postImage);
        Assert.assertFalse((boolean)rs.next());
    }

    @Test
    public void testStreamPartitionMetadataBootstrap() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        String cdcName = CDCStreamIT.generateUniqueName();
        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
        conn.createStatement().execute("CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)");
        this.createCDC(conn, cdc_sql, null);
        String streamName = this.getStreamName(conn, tableName, cdcName);
        this.assertStreamStatus(conn, tableName, streamName, CDCUtil.CdcStreamStatus.ENABLING);
        TaskRegionObserver.SelfHealingTask task = new TaskRegionObserver.SelfHealingTask(taskRegionEnvironment, 1800000L);
        task.run();
        this.assertPartitionMetadata(conn, tableName, cdcName);
        this.assertStreamStatus(conn, tableName, streamName, CDCUtil.CdcStreamStatus.ENABLED);
    }

    @Test
    public void testOnlyOneStreamAllowed() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        String cdcName = CDCStreamIT.generateUniqueName();
        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
        conn.createStatement().execute("CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)");
        this.createCDC(conn, cdc_sql, null);
        String streamName = this.getStreamName(conn, tableName, cdcName);
        String cdcName2 = CDCStreamIT.generateUniqueName();
        String cdc_sql2 = "CREATE CDC " + cdcName2 + " ON " + tableName;
        try {
            this.createCDC(conn, cdc_sql2, null);
            Assert.fail((String)"Only one CDC entity is allowed per table");
        }
        catch (SQLException e) {
            Assert.assertEquals((long)SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), (long)e.getErrorCode());
            Assert.assertTrue((boolean)e.getMessage().contains(streamName));
        }
        TaskRegionObserver.SelfHealingTask task = new TaskRegionObserver.SelfHealingTask(taskRegionEnvironment, 1800000L);
        task.run();
        try {
            this.createCDC(conn, cdc_sql2, null);
            Assert.fail((String)"Only one CDC entity is allowed per table");
        }
        catch (SQLException e) {
            Assert.assertEquals((long)SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), (long)e.getErrorCode());
            Assert.assertTrue((boolean)e.getMessage().contains(streamName));
        }
        this.dropCDC(conn, cdcName, tableName);
        this.assertStreamStatus(conn, tableName, streamName, CDCUtil.CdcStreamStatus.DISABLED);
        String cdcName3 = CDCStreamIT.generateUniqueName();
        String cdc_sql3 = "CREATE CDC " + cdcName3 + " ON " + tableName;
        this.createCDC(conn, cdc_sql3, null);
        streamName = this.getStreamName(conn, tableName, cdcName3);
        this.assertStreamStatus(conn, tableName, streamName, CDCUtil.CdcStreamStatus.ENABLING);
    }

    @Test
    public void testStreamMetadataWhenTableIsDropped() throws SQLException {
        Connection conn = this.newConnection();
        MetaDataClient mdc = new MetaDataClient(conn.unwrap(PhoenixConnection.class));
        String schemaName = "\"" + CDCStreamIT.generateUniqueName().toLowerCase() + "\"";
        String tableName = SchemaUtil.getTableName((String)schemaName, (String)("\"" + CDCStreamIT.generateUniqueName().toLowerCase() + "\""));
        String unescapedFullTableName = SchemaUtil.getUnEscapedFullName((String)tableName);
        String create_table_sql = "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)";
        conn.createStatement().execute(create_table_sql);
        String cdcName = "\"" + CDCStreamIT.generateUniqueName().toLowerCase() + "\"";
        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
        conn.createStatement().execute(cdc_sql);
        TaskRegionObserver.SelfHealingTask task = new TaskRegionObserver.SelfHealingTask(taskRegionEnvironment, 1800000L);
        task.run();
        String partitionQuery = "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + unescapedFullTableName + "'";
        ResultSet rs = conn.createStatement().executeQuery(partitionQuery);
        Assert.assertTrue((boolean)rs.next());
        String drop_table_sql = "DROP TABLE " + tableName;
        Assert.assertNotNull((Object)mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
        conn.createStatement().execute(drop_table_sql);
        Assert.assertNull((Object)mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
        rs = conn.createStatement().executeQuery(partitionQuery);
        Assert.assertFalse((boolean)rs.next());
        conn.createStatement().execute(create_table_sql);
        conn.createStatement().execute(cdc_sql);
        Assert.assertNotNull((Object)mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
        task.run();
        rs = conn.createStatement().executeQuery(partitionQuery);
        Assert.assertTrue((boolean)rs.next());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCDCStreamTTL() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"m"));
        String sql = "SELECT PARTITION_END_TIME FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'";
        ResultSet rs = conn.createStatement().executeQuery(sql);
        int count = 0;
        while (rs.next()) {
            ++count;
        }
        Assert.assertEquals((long)3L, (long)count);
        try {
            ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
            long t = System.currentTimeMillis() + 108000000L;
            t = t / 1000L * 1000L;
            EnvironmentEdgeManager.injectEdge((EnvironmentEdge)injectEdge);
            injectEdge.setValue(t);
            rs = conn.createStatement().executeQuery(sql);
            int newCount = 0;
            while (rs.next()) {
                if (rs.getLong(1) > 0L) {
                    Assert.fail((String)"Closed partition should have expired after TTL.");
                }
                ++newCount;
            }
            Assert.assertEquals((long)2L, (long)newCount);
        }
        finally {
            EnvironmentEdgeManager.reset();
        }
    }

    @Test
    public void testPartitionMetadataTableWithSingleRegionSplits() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"m"));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        CDCBaseIT.PartitionMetadata parent = null;
        ArrayList<CDCBaseIT.PartitionMetadata> daughters = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.endTime > 0L) {
                parent = pm;
                continue;
            }
            daughters.add(pm);
        }
        Assert.assertNotNull(parent);
        Assert.assertEquals((long)2L, (long)daughters.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).startTime, (Object)parent.endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).startTime, (Object)parent.endTime);
        Assert.assertEquals((Object)parent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentPartitionId);
        Assert.assertEquals((Object)parent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentPartitionId);
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.startKey == null && d.endKey != null && d.endKey[0] == 109));
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.endKey == null && d.startKey != null && d.startKey[0] == 109));
        Assert.assertEquals((Object)parent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentStartTime);
        Assert.assertEquals((Object)parent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentStartTime);
    }

    @Test
    public void testPartitionMetadataFirstRegionSplits() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"l"));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"d"));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        CDCBaseIT.PartitionMetadata grandparent = null;
        CDCBaseIT.PartitionMetadata splitParent = null;
        CDCBaseIT.PartitionMetadata unSplitParent = null;
        ArrayList<CDCBaseIT.PartitionMetadata> daughters = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.endTime > 0L) {
                if (pm.startKey == null && pm.endKey == null) {
                    grandparent = pm;
                    continue;
                }
                splitParent = pm;
                continue;
            }
            if (pm.endKey == null) {
                unSplitParent = pm;
                continue;
            }
            daughters.add(pm);
        }
        Assert.assertNotNull(grandparent);
        Assert.assertNotNull(unSplitParent);
        Assert.assertNotNull(splitParent);
        Assert.assertEquals((long)2L, (long)daughters.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).startTime, (Object)splitParent.endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).startTime, (Object)splitParent.endTime);
        Assert.assertEquals((Object)splitParent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentPartitionId);
        Assert.assertEquals((Object)splitParent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentPartitionId);
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.startKey == null && d.endKey != null && d.endKey[0] == 100));
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.startKey != null && d.startKey[0] == 100 && d.endKey[0] == 108));
        Assert.assertEquals((Object)splitParent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentStartTime);
        Assert.assertEquals((Object)splitParent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentStartTime);
    }

    @Test
    public void testPartitionMetadataLastRegionSplits() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"l"));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"q"));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        CDCBaseIT.PartitionMetadata grandparent = null;
        CDCBaseIT.PartitionMetadata splitParent = null;
        CDCBaseIT.PartitionMetadata unSplitParent = null;
        ArrayList<CDCBaseIT.PartitionMetadata> daughters = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.endTime > 0L) {
                if (pm.startKey == null && pm.endKey == null) {
                    grandparent = pm;
                    continue;
                }
                splitParent = pm;
                continue;
            }
            if (pm.startKey == null) {
                unSplitParent = pm;
                continue;
            }
            daughters.add(pm);
        }
        Assert.assertNotNull(grandparent);
        Assert.assertNotNull(unSplitParent);
        Assert.assertNotNull(splitParent);
        Assert.assertEquals((long)2L, (long)daughters.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).startTime, (Object)splitParent.endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).startTime, (Object)splitParent.endTime);
        Assert.assertEquals((Object)splitParent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentPartitionId);
        Assert.assertEquals((Object)splitParent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentPartitionId);
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.startKey[0] == 108 && d.endKey[0] == 113));
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.endKey == null && d.startKey != null && d.startKey[0] == 113));
        Assert.assertEquals((Object)splitParent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentStartTime);
        Assert.assertEquals((Object)splitParent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentStartTime);
    }

    @Test
    public void testPartitionMetadataMiddleRegionSplits() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"d"));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"q"));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"j"));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        CDCBaseIT.PartitionMetadata parent = null;
        ArrayList<CDCBaseIT.PartitionMetadata> daughters = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.startKey == null || pm.endKey == null) continue;
            if (pm.endTime > 0L) {
                parent = pm;
                continue;
            }
            daughters.add(pm);
        }
        Assert.assertNotNull(parent);
        Assert.assertEquals((long)2L, (long)daughters.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).startTime, (Object)parent.endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).startTime, (Object)parent.endTime);
        Assert.assertEquals((Object)parent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentPartitionId);
        Assert.assertEquals((Object)parent.partitionId, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentPartitionId);
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.startKey[0] == 100 && d.endKey[0] == 106));
        Assert.assertTrue((boolean)daughters.stream().anyMatch(d -> d.startKey[0] == 106 && d.endKey[0] == 113));
        Assert.assertEquals((Object)parent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)0)).parentStartTime);
        Assert.assertEquals((Object)parent.startTime, (Object)((CDCBaseIT.PartitionMetadata)daughters.get((int)1)).parentStartTime);
    }

    @Test
    public void testPartitionMetadataMergedRegionSplits() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"d"));
        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
        TestUtil.mergeTableRegions(conn, tableName, regions.stream().map(HRegionLocation::getRegion).map(RegionInfo::getEncodedName).collect(Collectors.toList()));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"l"));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        ArrayList<CDCBaseIT.PartitionMetadata> mergedParent = new ArrayList<CDCBaseIT.PartitionMetadata>();
        ArrayList<CDCBaseIT.PartitionMetadata> splitDaughters = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.startKey == null && pm.endKey == null && pm.parentPartitionId != null) {
                mergedParent.add(pm);
            }
            if (pm.endTime != 0L) continue;
            splitDaughters.add(pm);
        }
        Assert.assertEquals((long)2L, (long)mergedParent.size());
        Assert.assertEquals((long)2L, (long)splitDaughters.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).partitionId, (Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)1)).partitionId);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)1)).startTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).partitionId, (Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)0)).parentPartitionId);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).partitionId, (Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)1)).parentPartitionId);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)1)).startTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)1)).endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)0)).parentStartTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedParent.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)splitDaughters.get((int)1)).parentStartTime);
    }

    @Test
    public void testPartitionMetadataSplitRegionsMerge() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"l"));
        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
        TestUtil.mergeTableRegions(conn, tableName, regions.stream().map(HRegionLocation::getRegion).map(RegionInfo::getEncodedName).collect(Collectors.toList()));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        ArrayList<CDCBaseIT.PartitionMetadata> splitParents = new ArrayList<CDCBaseIT.PartitionMetadata>();
        ArrayList<CDCBaseIT.PartitionMetadata> mergedDaughter = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.startKey == null && pm.endKey == null && pm.endTime == 0L) {
                mergedDaughter.add(pm);
            }
            if (pm.startKey == null && pm.endKey == null) continue;
            splitParents.add(pm);
        }
        Assert.assertEquals((long)2L, (long)mergedDaughter.size());
        Assert.assertEquals((long)2L, (long)splitParents.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)1)).startTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).endTime, (Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)1)).endTime);
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).partitionId, (Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)1)).partitionId);
        Assert.assertTrue((boolean)mergedDaughter.stream().anyMatch(d -> Objects.equals(d.parentPartitionId, ((CDCBaseIT.PartitionMetadata)splitParents.get((int)0)).partitionId)));
        Assert.assertTrue((boolean)mergedDaughter.stream().anyMatch(d -> Objects.equals(d.parentPartitionId, ((CDCBaseIT.PartitionMetadata)splitParents.get((int)1)).partitionId)));
        for (CDCBaseIT.PartitionMetadata splitDaughter : splitParents) {
            Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).startTime, (Object)splitDaughter.endTime);
        }
        ArrayList parentStartTimes = new ArrayList();
        splitParents.forEach(p -> parentStartTimes.add(p.startTime));
        Assert.assertTrue((boolean)parentStartTimes.contains(((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).parentStartTime));
        Assert.assertTrue((boolean)parentStartTimes.contains(((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)1)).parentStartTime));
    }

    @Test
    public void testPartitionMetadataMergedRegionsMerge() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"l"));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"d"));
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"q"));
        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
        TestUtil.mergeTableRegions(conn, tableName, regions.subList(0, 2).stream().map(HRegionLocation::getRegion).map(RegionInfo::getEncodedName).collect(Collectors.toList()));
        TestUtil.mergeTableRegions(conn, tableName, regions.subList(2, 4).stream().map(HRegionLocation::getRegion).map(RegionInfo::getEncodedName).collect(Collectors.toList()));
        TestUtil.doMajorCompaction(conn, tableName);
        regions = TestUtil.getAllTableRegions(conn, tableName);
        TestUtil.mergeTableRegions(conn, tableName, regions.stream().map(HRegionLocation::getRegion).map(RegionInfo::getEncodedName).collect(Collectors.toList()));
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
        ArrayList<CDCBaseIT.PartitionMetadata> mergedDaughter = new ArrayList<CDCBaseIT.PartitionMetadata>();
        ArrayList<CDCBaseIT.PartitionMetadata> mergedParents = new ArrayList<CDCBaseIT.PartitionMetadata>();
        while (rs.next()) {
            CDCBaseIT.PartitionMetadata pm = new CDCBaseIT.PartitionMetadata(rs);
            if (pm.endTime == 0L) {
                mergedDaughter.add(pm);
                continue;
            }
            if (pm.startKey != null && pm.endKey != null) continue;
            mergedParents.add(pm);
        }
        Assert.assertEquals((long)2L, (long)mergedDaughter.size());
        Assert.assertEquals((long)9L, (long)mergedParents.size());
        Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).startTime, (Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)1)).startTime);
        Collections.sort(mergedParents, Comparator.comparing(o -> o.endTime));
        ArrayList<Long> parentStartTimes = new ArrayList<Long>();
        for (CDCBaseIT.PartitionMetadata mergedParent : mergedParents.subList(mergedParents.size() - 4, mergedParents.size())) {
            Assert.assertEquals((Object)((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).startTime, (Object)mergedParent.endTime);
            parentStartTimes.add(mergedParent.startTime);
        }
        Assert.assertTrue((boolean)parentStartTimes.contains(((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)0)).parentStartTime));
        Assert.assertTrue((boolean)parentStartTimes.contains(((CDCBaseIT.PartitionMetadata)mergedDaughter.get((int)1)).parentStartTime));
    }

    @Test
    public void testGetRecords() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('a', 1, 'foo')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('b', 2, 'bar')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('e', 3, 'alice')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('j', 4, 'bob')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('m', 5, 'cat')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('p', 6, 'cat')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('t', 7, 'cat')");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('z', 8, 'cat')");
        conn.commit();
        ResultSet rs = conn.createStatement().executeQuery("SELECT STREAM_NAME FROM SYSTEM.CDC_STREAM_STATUS WHERE TABLE_NAME='" + tableName + "'");
        Assert.assertTrue((boolean)rs.next());
        String streamName = rs.getString(1);
        rs = conn.createStatement().executeQuery("SELECT PARTITION_ID, PARTITION_START_TIME FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "' AND STREAM_NAME='" + streamName + "'");
        Assert.assertTrue((boolean)rs.next());
        String partitionId = rs.getString(1);
        long partitionStartTime = rs.getLong(2);
        String cdcName = streamName.split("/")[4];
        String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ *  FROM %s WHERE PARTITION_ID() = ?  AND PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(? AS BIGINT) AS TIMESTAMP) LIMIT ? ";
        PreparedStatement stmt = conn.prepareStatement(String.format(sql, cdcName));
        stmt.setString(1, partitionId);
        stmt.setLong(2, partitionStartTime);
        stmt.setInt(3, 5);
        rs = stmt.executeQuery();
        while (rs.next()) {
            String cdcJson = rs.getString(3);
            Map map = (Map)OBJECT_MAPPER.readValue(cdcJson, Map.class);
            Assert.assertTrue((boolean)((Map)map.get("pre_image")).isEmpty());
            Assert.assertEquals((Object)"upsert", map.get("event_type"));
        }
    }

    @Test
    public void testPartitionUpdateFailureMetrics() throws Exception {
        Connection conn = this.newConnection();
        String tableName = CDCStreamIT.generateUniqueName();
        this.createTableAndEnableCDC(conn, tableName, true);
        Assert.assertEquals((String)"Post split partition update failures should be 0 initially", (long)0L, (long)METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
        Assert.assertEquals((String)"Post merge partition update failures should be 0 initially", (long)0L, (long)METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
        TestUtil.splitTable(conn, tableName, Bytes.toBytes((String)"m"));
        Assert.assertEquals((String)"Post split partition update failures should be 0 after successful split", (long)0L, (long)METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
        TestUtil.mergeTableRegions(conn, tableName, regions.stream().map(HRegionLocation::getRegion).map(RegionInfo::getEncodedName).collect(Collectors.toList()));
        Assert.assertEquals((String)"Post merge partition update failures should be 0 after successful merge", (long)0L, (long)METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
    }

    private void assertStreamStatus(Connection conn, String tableName, String streamName, CDCUtil.CdcStreamStatus status) throws SQLException {
        ResultSet rs = conn.createStatement().executeQuery("SELECT STREAM_STATUS FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + tableName + "' AND STREAM_NAME='" + streamName + "'");
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((Object)status.getSerializedValue(), (Object)rs.getString(1));
    }

    private void assertPartitionMetadata(Connection conn, String tableName, String cdcName) throws SQLException {
        String streamName = this.getStreamName(conn, tableName, cdcName);
        List tableRegions = conn.unwrap(PhoenixConnection.class).getQueryServices().getAllTableRegions(tableName.getBytes());
        for (HRegionLocation tableRegion : tableRegions) {
            HRegionInfo ri = tableRegion.getRegionInfo();
            PreparedStatement ps = conn.prepareStatement("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID= ?");
            ps.setString(1, tableName);
            ps.setString(2, streamName);
            ps.setString(3, ri.getEncodedName());
            ResultSet rs = ps.executeQuery();
            Assert.assertTrue((boolean)rs.next());
        }
    }
}

