/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end.index;

import java.io.IOException;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={NeedsOwnMiniClusterTest.class})
public class MutableIndexReplicationIT
extends BaseTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(MutableIndexReplicationIT.class);
    public static final String SCHEMA_NAME = "";
    public static final String DATA_TABLE_NAME = "T";
    public static final String INDEX_TABLE_NAME = "I";
    public static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName((String)"", (String)"T");
    public static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName((String)"", (String)"I");
    private static final long REPLICATION_WAIT_TIME_MILLIS = 10000L;
    protected static PhoenixTestDriver driver;
    private static String URL;
    protected static Configuration conf1;
    protected static Configuration conf2;
    protected static ZKWatcher zkw1;
    protected static ZKWatcher zkw2;
    protected static Admin admin;
    protected static HBaseTestingUtility utility1;
    protected static HBaseTestingUtility utility2;
    protected static final int REPLICATION_RETRIES = 100;
    protected static final byte[] tableName;
    protected static final byte[] row;

    @BeforeClass
    public static synchronized void setUpBeforeClass() throws Exception {
        MutableIndexReplicationIT.setupConfigsAndStartCluster();
        MutableIndexReplicationIT.setupDriver();
    }

    private static void setupConfigsAndStartCluster() throws Exception {
        MutableIndexReplicationIT.setUpConfigForMiniCluster(conf1);
        conf1.setFloat("hbase.regionserver.logroll.multiplier", 3.0E-4f);
        conf1.setInt("replication.source.size.capacity", 10240);
        conf1.setLong("replication.source.sleepforretries", 100L);
        conf1.setInt("hbase.regionserver.maxlogs", 10);
        conf1.setLong("hbase.master.logcleaner.ttl", 10L);
        conf1.setInt("zookeeper.recovery.retry", 1);
        conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
        conf1.setBoolean("dfs.support.append", true);
        conf1.setLong("hbase.server.thread.wakefrequency", 100L);
        conf1.setInt("replication.stats.thread.period.seconds", 5);
        conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster miniZK = utility1.getZkCluster();
        conf1 = utility1.getConfiguration();
        zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
        admin = ConnectionFactory.createConnection((Configuration)conf1).getAdmin();
        LOGGER.info("Setup first Zk");
        conf2 = HBaseConfiguration.create((Configuration)conf1);
        conf2.set("zookeeper.znode.parent", "/2");
        conf2.setInt("hbase.client.retries.number", 6);
        conf2.setBoolean("dfs.support.append", true);
        conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        utility2 = new HBaseTestingUtility(conf2);
        utility2.setZkCluster(miniZK);
        zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
        LOGGER.info("Setup second Zk");
        utility1.startMiniCluster(2);
        utility2.startMiniCluster(2);
        admin.addReplicationPeer("1", ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build());
    }

    private static void setupDriver() throws Exception {
        LOGGER.info("Setting up phoenix driver");
        HashMap props = Maps.newHashMapWithExpectedSize((int)3);
        props.put("phoenix.index.mutableBatchSizeThreshold", Integer.toString(2));
        props.put("phoenix.schema.dropMetaData", Boolean.toString(true));
        URL = MutableIndexReplicationIT.getLocalClusterUrl(utility1);
        LOGGER.info("Connecting driver to " + URL);
        driver = MutableIndexReplicationIT.initAndRegisterTestDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
    }

    @Test
    public void testReplicationWithMutableIndexes() throws Exception {
        java.sql.Connection conn = MutableIndexReplicationIT.getConnection();
        conn.createStatement().execute("CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
        conn.createStatement().execute("CREATE INDEX I ON " + DATA_TABLE_FULL_NAME + " (v1)");
        String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
        ResultSet rs = conn.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = conn.createStatement().executeQuery(query);
        Assert.assertFalse((boolean)rs.next());
        Admin admin = utility1.getAdmin();
        Admin admin2 = utility2.getAdmin();
        ArrayList<String> dataTables = new ArrayList<String>();
        dataTables.add(DATA_TABLE_FULL_NAME);
        dataTables.add(INDEX_TABLE_FULL_NAME);
        for (String tableName : dataTables) {
            TableDescriptor desc = admin.getDescriptor(TableName.valueOf((String)tableName));
            admin2.createTable(desc);
            LOGGER.info("Enabling replication on source table: " + tableName);
            ColumnFamilyDescriptor[] cols = desc.getColumnFamilies();
            Assert.assertEquals((long)1L, (long)cols.length);
            ColumnFamilyDescriptor col = ColumnFamilyDescriptorBuilder.newBuilder((byte[])cols[0].getName()).setScope(1).build();
            desc = TableDescriptorBuilder.newBuilder((TableDescriptor)desc).removeColumnFamily(cols[0].getName()).setColumnFamily(col).build();
            admin.disableTable(desc.getTableName());
            admin.modifyTable(desc);
            admin.enableTable(desc.getTableName());
            LOGGER.info("Replication enabled on source table: " + tableName);
        }
        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
        stmt.setString(1, "a");
        stmt.setString(2, "x");
        stmt.setString(3, "1");
        stmt.execute();
        conn.commit();
        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
        rs = conn.createStatement().executeQuery(query);
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((Object)"x", (Object)rs.getString(1));
        Assert.assertFalse((boolean)rs.next());
        conn.close();
        LOGGER.info("Looking up tables in replication target");
        TableName[] tables = admin2.listTableNames();
        Connection hbaseConn = ConnectionFactory.createConnection((Configuration)utility2.getConfiguration());
        Table remoteTable = hbaseConn.getTable(tables[0]);
        for (int i = 0; i < 100; ++i) {
            if (i >= 99) {
                Assert.fail((String)("Waited too much time for put replication on table " + remoteTable.getDescriptor().getTableName()));
            }
            if (this.ensureAnyRows(remoteTable)) break;
            LOGGER.info("Sleeping for 10000 for edits to get replicated");
            Thread.sleep(10000L);
        }
        remoteTable.close();
    }

    private boolean ensureAnyRows(Table remoteTable) throws IOException {
        Scan scan = new Scan();
        scan.setRaw(true);
        ResultScanner scanner = remoteTable.getScanner(scan);
        boolean found = false;
        for (Result r : scanner) {
            LOGGER.info("got row: " + r);
            found = true;
        }
        scanner.close();
        return found;
    }

    private static java.sql.Connection getConnection() throws Exception {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        return DriverManager.getConnection(URL, props);
    }

    static {
        conf1 = HBaseConfiguration.create();
        tableName = Bytes.toBytes((String)"test");
        row = Bytes.toBytes((String)"row");
    }
}

