package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.shaded.org.joni.constants.StackType;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, LargeTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.class */
public class TestMultiSlaveReplication {
    private static Configuration conf1;
    private static Configuration conf2;
    private static Configuration conf3;
    private static HBaseTestingUtility utility1;
    private static HBaseTestingUtility utility2;
    private static HBaseTestingUtility utility3;
    private static final long SLEEP_TIME = 500;
    private static final int NB_RETRIES = 100;
    private static HTableDescriptor table;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMultiSlaveReplication.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class);
    private static final TableName tableName = TableName.valueOf("test");
    private static final byte[] famName = Bytes.toBytes("f");
    private static final byte[] row = Bytes.toBytes("row");
    private static final byte[] row1 = Bytes.toBytes("row1");
    private static final byte[] row2 = Bytes.toBytes("row2");
    private static final byte[] row3 = Bytes.toBytes("row3");
    private static final byte[] noRepfamName = Bytes.toBytes("norep");

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1 = HBaseConfiguration.create();
        conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
        conf1.setInt(WALUtil.WAL_BLOCK_SIZE, StackType.NULL_CHECK_END);
        conf1.setInt("replication.source.size.capacity", 1024);
        conf1.setLong("replication.source.sleepforretries", 100L);
        conf1.setInt(AbstractFSWAL.MAX_LOGS, 10);
        conf1.setLong(TimeToLiveLogCleaner.TTL_CONF_KEY, 10L);
        conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100L);
        conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
        conf1.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 5000);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster zkCluster = utility1.getZkCluster();
        utility1.setZkCluster(zkCluster);
        new ZKWatcher(conf1, "cluster1", null, true);
        conf2 = new Configuration(conf1);
        conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
        conf3 = new Configuration(conf1);
        conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
        utility2 = new HBaseTestingUtility(conf2);
        utility2.setZkCluster(zkCluster);
        new ZKWatcher(conf2, "cluster2", null, true);
        utility3 = new HBaseTestingUtility(conf3);
        utility3.setZkCluster(zkCluster);
        new ZKWatcher(conf3, "cluster3", null, true);
        table = new HTableDescriptor(tableName);
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(famName);
        hColumnDescriptor.setScope(1);
        table.addFamily(hColumnDescriptor);
        table.addFamily(new HColumnDescriptor(noRepfamName));
    }

    @Test
    public void testMultiSlaveReplication() throws Exception {
        LOG.info("testCyclicReplication");
        utility1.startMiniCluster();
        utility2.startMiniCluster();
        utility3.startMiniCluster();
        ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
        utility1.getAdmin().createTable(table);
        utility2.getAdmin().createTable(table);
        utility3.getAdmin().createTable(table);
        Table table2 = utility1.getConnection().getTable(tableName);
        Table table3 = utility2.getConnection().getTable(tableName);
        Table table4 = utility3.getConnection().getTable(tableName);
        ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig();
        replicationPeerConfig.setClusterKey(utility2.getClusterKey());
        replicationAdmin.addPeer("1", replicationPeerConfig, null);
        putAndWait(row, famName, table2, table3);
        deleteAndWait(row, table2, table3);
        checkRow(row, 0, table4);
        putAndWait(row2, famName, table2, table3);
        rollWALAndWait(utility1, table2.getName(), row2);
        putAndWait(row3, famName, table2, table3);
        ReplicationPeerConfig replicationPeerConfig2 = new ReplicationPeerConfig();
        replicationPeerConfig2.setClusterKey(utility3.getClusterKey());
        replicationAdmin.addPeer("2", replicationPeerConfig2, null);
        putAndWait(row1, famName, table2, table3, table4);
        deleteAndWait(row1, table2, table3, table4);
        checkRow(row2, 0, table4);
        checkRow(row3, 1, table4);
        Put put = new Put(row);
        put.addColumn(famName, row, row);
        table2.put(put);
        rollWALAndWait(utility1, table2.getName(), row);
        deleteAndWait(row2, table2, table3, table4);
        checkRow(row, 1, table3);
        checkWithWait(row, 1, table4);
        deleteAndWait(row, table2, table3, table4);
        deleteAndWait(row3, table2, table3, table4);
        utility3.shutdownMiniCluster();
        utility2.shutdownMiniCluster();
        utility1.shutdownMiniCluster();
    }

    private void rollWALAndWait(HBaseTestingUtility hBaseTestingUtility, TableName tableName2, byte[] bArr) throws IOException {
        Admin admin = hBaseTestingUtility.getAdmin();
        MiniHBaseCluster miniHBaseCluster = hBaseTestingUtility.getMiniHBaseCluster();
        HRegion hRegion = null;
        Iterator<HRegion> it = miniHBaseCluster.getRegions(tableName2).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            HRegion next = it.next();
            if (HRegion.rowIsInRange(next.getRegionInfo(), bArr)) {
                hRegion = next;
                break;
            }
        }
        Assert.assertNotNull("Couldn't find the region for row '" + Arrays.toString(bArr) + "'", hRegion);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        WALActionsListener wALActionsListener = new WALActionsListener() { // from class: org.apache.hadoop.hbase.replication.TestMultiSlaveReplication.1
            @Override // org.apache.hadoop.hbase.regionserver.wal.WALActionsListener
            public void postLogRoll(Path path, Path path2) throws IOException {
                countDownLatch.countDown();
            }
        };
        hRegion.getWAL().registerWALActionsListener(wALActionsListener);
        admin.rollWALWriter(miniHBaseCluster.getServerHoldingRegion(hRegion.getTableDescriptor().getTableName(), hRegion.getRegionInfo().getRegionName()));
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for the wal of '" + hRegion + "' to roll. If later replication tests fail, it's probably because we should still be waiting.");
            Thread.currentThread().interrupt();
        }
        hRegion.getWAL().unregisterWALActionsListener(wALActionsListener);
    }

    private void checkWithWait(byte[] bArr, int i, Table table2) throws Exception {
        Get get = new Get(bArr);
        for (int i2 = 0; i2 < 100; i2++) {
            if (i2 == 99) {
                Assert.fail("Waited too much time while getting the row.");
            }
            if (table2.get(get).size() >= 1) {
                LOG.info("Row is replicated");
                Assert.assertEquals("Table '" + table2 + "' did not have the expected number of  results.", i, r0.size());
                return;
            } else {
                if (0 != 0) {
                    return;
                }
                Thread.sleep(500L);
            }
        }
    }

    private void checkRow(byte[] bArr, int i, Table... tableArr) throws IOException {
        Get get = new Get(bArr);
        for (Table table2 : tableArr) {
            Assert.assertEquals("Table '" + table2 + "' did not have the expected number of results.", i, r0.get(get).size());
        }
    }

    private void deleteAndWait(byte[] bArr, Table table2, Table... tableArr) throws Exception {
        table2.delete(new Delete(bArr));
        Get get = new Get(bArr);
        for (int i = 0; i < 100; i++) {
            if (i == 99) {
                Assert.fail("Waited too much time for del replication");
            }
            boolean z = true;
            int length = tableArr.length;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (tableArr[i2].get(get).size() >= 1) {
                    LOG.info("Row not deleted");
                    z = false;
                    break;
                }
                i2++;
            }
            if (z) {
                return;
            }
            Thread.sleep(500L);
        }
    }

    private void putAndWait(byte[] bArr, byte[] bArr2, Table table2, Table... tableArr) throws Exception {
        Put put = new Put(bArr);
        put.addColumn(bArr2, bArr, bArr);
        table2.put(put);
        Get get = new Get(bArr);
        for (int i = 0; i < 100; i++) {
            if (i == 99) {
                Assert.fail("Waited too much time for put replication");
            }
            boolean z = true;
            int length = tableArr.length;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                Result result = tableArr[i2].get(get);
                if (result.isEmpty()) {
                    LOG.info("Row not available");
                    z = false;
                    break;
                } else {
                    Assert.assertArrayEquals(result.value(), bArr);
                    i2++;
                }
            }
            if (z) {
                return;
            }
            Thread.sleep(500L);
        }
    }
}
