/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end.index;

import java.io.IOException;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.phoenix.coprocessor.ReplicationSinkEndpoint;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={NeedsOwnMiniClusterTest.class})
public class ReplicationWithWALAnnotationIT
extends BaseTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationWithWALAnnotationIT.class);
    private static final String SCHEMA_NAME = ReplicationWithWALAnnotationIT.generateUniqueName();
    private static final String DATA_TABLE_NAME = ReplicationWithWALAnnotationIT.generateUniqueName();
    private static final String INDEX_TABLE_NAME = "IDX_" + DATA_TABLE_NAME;
    private static final String TENANT_VIEW_NAME = ReplicationWithWALAnnotationIT.generateUniqueName();
    private static final String TENANT_VIEW_INDEX_NAME = "IDX_" + TENANT_VIEW_NAME;
    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName((String)SCHEMA_NAME, (String)DATA_TABLE_NAME);
    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName((String)SCHEMA_NAME, (String)INDEX_TABLE_NAME);
    private static final long REPLICATION_WAIT_TIME_MILLIS = 10000L;
    private static String url1;
    private static String url2;
    protected static Configuration conf1;
    protected static Configuration conf2;
    protected static ZKWatcher zkw1;
    protected static ZKWatcher zkw2;
    protected static HBaseTestingUtility utility1;
    protected static HBaseTestingUtility utility2;
    protected static final int REPLICATION_RETRIES = 10;
    protected static final byte[] tableName;
    protected static final byte[] row;
    private static Map<String, String> props;

    @BeforeClass
    public static synchronized void setUpBeforeClass() throws Exception {
        ReplicationWithWALAnnotationIT.setupConfigsAndStartCluster();
        props = Maps.newHashMapWithExpectedSize((int)3);
        props.put("phoenix.index.mutableBatchSizeThreshold", Integer.toString(2));
        props.put("phoenix.schema.dropMetaData", Boolean.toString(true));
        url1 = ReplicationWithWALAnnotationIT.getLocalClusterUrl(utility1);
        url2 = ReplicationWithWALAnnotationIT.getLocalClusterUrl(utility2);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        try {
            utility1.shutdownMiniCluster();
            utility2.shutdownMiniCluster();
        }
        finally {
            ServerMetadataCacheTestImpl.resetCache();
        }
    }

    private static void setupConfigsAndStartCluster() throws Exception {
        conf1.setInt("zookeeper.recovery.retry", 1);
        conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
        conf1.setBoolean("dfs.support.append", true);
        conf1.setLong("hbase.server.thread.wakefrequency", 100L);
        conf1.setInt("replication.stats.thread.period.seconds", 5);
        conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        conf1.setStrings("hbase.coprocessor.regionserver.classes", new String[]{ReplicationSinkEndpoint.class.getName()});
        conf1.setBoolean("phoenix.append.metadata.to.wal", true);
        conf1.set("zookeeper.znode.parent", "/1");
        ReplicationWithWALAnnotationIT.setUpConfigForMiniCluster(conf1);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        conf1 = utility1.getConfiguration();
        zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
        Admin admin = ConnectionFactory.createConnection((Configuration)conf1).getAdmin();
        conf2 = HBaseConfiguration.create((Configuration)conf1);
        conf2.setInt("hbase.client.retries.number", 6);
        conf2.setBoolean("dfs.support.append", true);
        conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        utility2 = new HBaseTestingUtility(conf2);
        utility2.startMiniZKCluster();
        zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
        utility1.startMiniCluster(2);
        utility2.startMiniCluster(2);
        admin.addReplicationPeer("1", ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build());
    }

    private boolean isReplicationSinkEndpointEnabled() {
        String hbaseVersion = VersionInfo.getVersion();
        String[] versionArr = hbaseVersion.split("\\.");
        int majorVersion = Integer.parseInt(versionArr[0]);
        int minorVersion = Integer.parseInt(versionArr[1]);
        int patchVersion = Integer.parseInt(versionArr[2].split("-")[0]);
        if (majorVersion > 2) {
            return true;
        }
        if (majorVersion < 2) {
            return false;
        }
        if (minorVersion > 5) {
            return true;
        }
        if (minorVersion < 4) {
            return false;
        }
        if (minorVersion == 4) {
            return patchVersion >= 16;
        }
        return patchVersion >= 3;
    }

    @Test
    public void testReplicationWithWALExtendedAttributes() throws Exception {
        Assume.assumeTrue((String)"Replication sink endpoint on hbase versions 2.4.16+ or 2.5.3+", (boolean)this.isReplicationSinkEndpointEnabled());
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        java.sql.Connection primaryConnection = ReplicationWithWALAnnotationIT.getConnection(url1);
        primaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        primaryConnection.createStatement().execute(String.format("CREATE TABLE %s (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) REPLICATION_SCOPE = 1", DATA_TABLE_FULL_NAME));
        primaryConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
        ResultSet rs = primaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = primaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        java.sql.Connection secondaryConnection = ReplicationWithWALAnnotationIT.getConnection(url2);
        secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        secondaryConnection.createStatement().execute(String.format("CREATE TABLE %s (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) REPLICATION_SCOPE = 1", DATA_TABLE_FULL_NAME));
        secondaryConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
        rs = secondaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = secondaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        TestUtil.addCoprocessor(secondaryConnection, DATA_TABLE_FULL_NAME, TestCoprocessorForWALAnnotationAtSink.class);
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        primaryConnection = ReplicationWithWALAnnotationIT.getConnection(url1);
        PreparedStatement stmt = primaryConnection.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
        stmt.setString(1, "a1");
        stmt.setString(2, "x1");
        stmt.setString(3, "11");
        stmt.execute();
        stmt.setString(1, "a2");
        stmt.setString(2, "x2");
        stmt.setString(3, "12");
        stmt.execute();
        primaryConnection.commit();
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = primaryConnection.createStatement().executeQuery(query);
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((Object)"x1", (Object)rs.getString(1));
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((Object)"x2", (Object)rs.getString(1));
        Assert.assertFalse((boolean)rs.next());
        primaryConnection.close();
        this.assertReplicatedData(DATA_TABLE_FULL_NAME);
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        primaryConnection = ReplicationWithWALAnnotationIT.getConnection(url1);
        primaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        secondaryConnection = ReplicationWithWALAnnotationIT.getConnection(url2);
        secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
    }

    @Test
    public void testReplicationWithWALExtendedAttributesWithTenants() throws Exception {
        Assume.assumeTrue((String)"Replication sink endpoint on hbase versions 2.4.16+ or 2.5.3+", (boolean)this.isReplicationSinkEndpointEnabled());
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        java.sql.Connection primaryConnection = ReplicationWithWALAnnotationIT.getConnection(url1);
        primaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        primaryConnection.createStatement().execute(String.format("CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, k VARCHAR NOT NULL, v1 VARCHAR, v2 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, k)) REPLICATION_SCOPE = 1, MULTI_TENANT = true", DATA_TABLE_FULL_NAME));
        primaryConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        java.sql.Connection primaryTenantConnection = ReplicationWithWALAnnotationIT.getTenantConnection(url1, "tenant01");
        primaryTenantConnection.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s ", TENANT_VIEW_NAME, DATA_TABLE_FULL_NAME));
        primaryTenantConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", TENANT_VIEW_INDEX_NAME, TENANT_VIEW_NAME));
        String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
        ResultSet rs = primaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = primaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + TENANT_VIEW_NAME;
        rs = primaryTenantConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + TENANT_VIEW_INDEX_NAME;
        rs = primaryTenantConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        java.sql.Connection secondaryConnection = ReplicationWithWALAnnotationIT.getConnection(url2);
        secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        secondaryConnection.createStatement().execute(String.format("CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, k VARCHAR NOT NULL, v1 VARCHAR, v2 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, k)) REPLICATION_SCOPE = 1, MULTI_TENANT = true", DATA_TABLE_FULL_NAME));
        secondaryConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
        java.sql.Connection secondaryTenantConnection = ReplicationWithWALAnnotationIT.getTenantConnection(url2, "tenant01");
        secondaryTenantConnection.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s ", TENANT_VIEW_NAME, DATA_TABLE_FULL_NAME));
        secondaryTenantConnection.createStatement().execute(String.format("CREATE INDEX %s ON %s (v1)", TENANT_VIEW_INDEX_NAME, TENANT_VIEW_NAME));
        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
        rs = secondaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = secondaryConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + TENANT_VIEW_NAME;
        rs = secondaryTenantConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + TENANT_VIEW_INDEX_NAME;
        rs = secondaryTenantConnection.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        TestUtil.addCoprocessor(secondaryConnection, DATA_TABLE_FULL_NAME, TestTenantCoprocessorForWALAnnotationAtSink.class);
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        primaryTenantConnection = ReplicationWithWALAnnotationIT.getTenantConnection(url1, "tenant01");
        PreparedStatement stmt = primaryTenantConnection.prepareStatement("UPSERT INTO " + TENANT_VIEW_NAME + " VALUES(?,?,?)");
        stmt.setString(1, "a1");
        stmt.setString(2, "x1");
        stmt.setString(3, "11");
        stmt.execute();
        stmt.setString(1, "a2");
        stmt.setString(2, "x2");
        stmt.setString(3, "12");
        stmt.execute();
        primaryTenantConnection.commit();
        query = "SELECT * FROM " + TENANT_VIEW_INDEX_NAME;
        rs = primaryTenantConnection.createStatement().executeQuery(query);
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((Object)"x1", (Object)rs.getString(1));
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((Object)"x2", (Object)rs.getString(1));
        Assert.assertFalse((boolean)rs.next());
        primaryTenantConnection.close();
        this.assertReplicatedData(DATA_TABLE_FULL_NAME);
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
        primaryConnection = ReplicationWithWALAnnotationIT.getConnection(url1);
        primaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
        driver = ReplicationWithWALAnnotationIT.initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
        secondaryConnection = ReplicationWithWALAnnotationIT.getConnection(url2);
        secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
        ReplicationWithWALAnnotationIT.destroyDriver((Driver)((Object)driver));
    }

    private void assertReplicatedData(String tableName) throws IOException, InterruptedException {
        LOGGER.info("Looking up tables in replication target");
        Connection hbaseConn = ConnectionFactory.createConnection((Configuration)utility2.getConfiguration());
        Table remoteTable = hbaseConn.getTable(TableName.valueOf((String)tableName));
        for (int i = 0; i < 10; ++i) {
            if (i >= 9) {
                Assert.fail((String)("Waited too much time for put replication on table " + remoteTable.getDescriptor().getTableName()));
            }
            if (this.ensureRows(remoteTable, 2)) break;
            LOGGER.info("Sleeping for 10000 for edits to get replicated");
            Thread.sleep(10000L);
        }
        remoteTable.close();
    }

    private boolean ensureRows(Table remoteTable, int numRows) throws IOException {
        Scan scan = new Scan();
        scan.setRaw(true);
        ResultScanner scanner = remoteTable.getScanner(scan);
        int rows = 0;
        for (Result r : scanner) {
            LOGGER.info("got row: {}", (Object)r);
            ++rows;
        }
        scanner.close();
        return rows == numRows;
    }

    private static java.sql.Connection getConnection(String url) throws Exception {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        return DriverManager.getConnection(url, props);
    }

    private static java.sql.Connection getTenantConnection(String url, String tenantId) throws Exception {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        props.setProperty("TenantId", tenantId);
        return DriverManager.getConnection(url, props);
    }

    static {
        conf1 = HBaseConfiguration.create();
        tableName = Bytes.toBytes((String)"test");
        row = Bytes.toBytes((String)"row");
    }

    public static class TestCoprocessorForWALAnnotationAtSink
    implements RegionCoprocessor,
    RegionObserver {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
            String tenantId = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()));
            String schemaName = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()));
            String logicalTableName = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()));
            String tableType = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()));
            LOGGER.info("TestCoprocessorForWALAnnotationAtSink preBatchMutate: tenantId: {}, schemaName: {}, logicalTableName: {}, tableType: {}", new Object[]{tenantId, schemaName, logicalTableName, tableType});
            if (!(tenantId == null && SCHEMA_NAME.equals(schemaName) && DATA_TABLE_NAME.equals(logicalTableName) && PTableType.TABLE.getValue().toString().equals(tableType))) {
                throw new IOException("Replication Sink mutation attributes are not matching. Abort the mutation.");
            }
        }
    }

    public static class TestTenantCoprocessorForWALAnnotationAtSink
    implements RegionCoprocessor,
    RegionObserver {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
            String tenantId = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()));
            String schemaName = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()));
            String logicalTableName = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()));
            String tableType = Bytes.toString((byte[])((Mutation)miniBatchOp.getOperation(0)).getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()));
            LOGGER.info("TestCoprocessorForWALAnnotationAtSink preBatchMutate: tenantId: {}, schemaName: {}, logicalTableName: {}, tableType: {}", new Object[]{tenantId, schemaName, logicalTableName, tableType});
            if (!("tenant01".equals(tenantId) && "".equals(schemaName) && TENANT_VIEW_NAME.equals(logicalTableName) && PTableType.VIEW.getValue().toString().equals(tableType))) {
                throw new IOException("Replication Sink mutation attributes are not matching. Abort the mutation.");
            }
        }
    }
}

