package org.apache.accumulo.test.replication;

import com.google.common.collect.Iterators;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.master.replication.SequentialWorkAssigner;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore("Replication ITs are not stable and not currently maintained")
/* loaded from: input_file:org/apache/accumulo/test/replication/MultiInstanceReplicationIT.class */
public class MultiInstanceReplicationIT extends ConfigurableMacBase {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class);
    private ExecutorService executor;

    @Override // org.apache.accumulo.harness.AccumuloITBase
    public int defaultTimeoutSeconds() {
        return 900;
    }

    @Before
    public void createExecutor() {
        this.executor = Executors.newSingleThreadExecutor();
    }

    @After
    public void stopExecutor() {
        if (null != this.executor) {
            this.executor.shutdownNow();
        }
    }

    @Override // org.apache.accumulo.test.functional.ConfigurableMacBase
    public void configure(MiniAccumuloConfigImpl miniAccumuloConfigImpl, Configuration configuration) {
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
        miniAccumuloConfigImpl.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
        miniAccumuloConfigImpl.setProperty(Property.GC_CYCLE_START, "1s");
        miniAccumuloConfigImpl.setProperty(Property.GC_CYCLE_DELAY, "5s");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
        miniAccumuloConfigImpl.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "master");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
        miniAccumuloConfigImpl.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
        configuration.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }

    private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl miniAccumuloConfigImpl, MiniAccumuloConfigImpl miniAccumuloConfigImpl2) {
        Map siteConfig = miniAccumuloConfigImpl.getSiteConfig();
        if ("true".equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
            HashMap hashMap = new HashMap();
            hashMap.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
            String str = (String) siteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
            Assert.assertNotNull("Keystore Path was null", str);
            hashMap.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), str);
            String str2 = (String) siteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
            Assert.assertNotNull("Truststore Path was null", str2);
            hashMap.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), str2);
            String str3 = (String) siteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
            if (null != str3) {
                hashMap.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), str3);
            }
            String str4 = (String) siteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
            if (null != str4) {
                hashMap.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), str4);
            }
            System.out.println("Setting site configuration for peer " + hashMap);
            miniAccumuloConfigImpl2.setSiteConfig(hashMap);
        }
        String str5 = (String) siteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
        if (null != str5) {
            Map siteConfig2 = miniAccumuloConfigImpl2.getSiteConfig();
            siteConfig2.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), str5);
            miniAccumuloConfigImpl2.setSiteConfig(siteConfig2);
        }
    }

    @Test
    public void dataWasReplicatedToThePeer() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            final Connector connector = getConnector();
            Connector connector2 = miniAccumuloClusterImpl.getConnector("root", new PasswordToken("testRootPassword1"));
            ReplicationTable.setOnline(connector);
            connector2.securityOperations().createLocalUser("peer", new PasswordToken("foo"));
            connector.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "peer");
            connector.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "foo");
            connector.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
            connector.tableOperations().create("master");
            Assert.assertNotNull((String) connector.tableOperations().tableIdMap().get("master"));
            connector2.tableOperations().create("peer");
            String str = (String) connector2.tableOperations().tableIdMap().get("peer");
            Assert.assertNotNull(str);
            connector2.securityOperations().grantTablePermission("peer", "peer", TablePermission.WRITE);
            connector.tableOperations().setProperty("master", Property.TABLE_REPLICATION.getKey(), "true");
            connector.tableOperations().setProperty("master", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
            BatchWriter createBatchWriter = connector.createBatchWriter("master", new BatchWriterConfig());
            for (int i = 0; i < 5000; i++) {
                Mutation mutation = new Mutation(Integer.toString(i));
                for (int i2 = 0; i2 < 100; i2++) {
                    String num = Integer.toString(i2);
                    mutation.put(num, "", num);
                }
                createBatchWriter.addMutation(mutation);
            }
            createBatchWriter.close();
            log.info("Wrote all data to master cluster");
            final Set referencedFiles = connector.replicationOperations().referencedFiles("master");
            log.info("Files to replicate: " + referencedFiles);
            Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
            while (it.hasNext()) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            log.info("TabletServer restarted");
            Iterators.size(ReplicationTable.getScanner(connector).iterator());
            log.info("TabletServer is online");
            while (!ReplicationTable.isOnline(connector)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            log.info("");
            log.info("Fetching metadata records:");
            Scanner<Map.Entry> createScanner = connector.createScanner("accumulo.metadata", Authorizations.EMPTY);
            try {
                for (Map.Entry entry : createScanner) {
                    if (MetadataSchema.ReplicationSection.COLF.equals(((Key) entry.getKey()).getColumnFamily())) {
                        log.info(((Key) entry.getKey()).toStringNoTruncate() + " " + ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry.getValue()).get())));
                    } else {
                        log.info(((Key) entry.getKey()).toStringNoTruncate() + " " + entry.getValue());
                    }
                }
                if (createScanner != null) {
                    createScanner.close();
                }
                log.info("");
                log.info("Fetching replication records:");
                Scanner<Map.Entry> scanner = ReplicationTable.getScanner(connector);
                try {
                    for (Map.Entry entry2 : scanner) {
                        log.info(((Key) entry2.getKey()).toStringNoTruncate() + " " + ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry2.getValue()).get())));
                    }
                    if (scanner != null) {
                        scanner.close();
                    }
                    Future submit = this.executor.submit(new Callable<Boolean>() { // from class: org.apache.accumulo.test.replication.MultiInstanceReplicationIT.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Boolean call() throws Exception {
                            long currentTimeMillis = System.currentTimeMillis();
                            connector.replicationOperations().drain("master", referencedFiles);
                            MultiInstanceReplicationIT.log.info("Drain completed in " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                            return true;
                        }
                    });
                    try {
                        try {
                            submit.get(60L, TimeUnit.SECONDS);
                            this.executor.shutdownNow();
                        } catch (TimeoutException e) {
                            submit.cancel(true);
                            Assert.fail("Drain did not finish within 60 seconds");
                            this.executor.shutdownNow();
                        }
                        log.info("drain completed");
                        log.info("");
                        log.info("Fetching metadata records:");
                        createScanner = connector.createScanner("accumulo.metadata", Authorizations.EMPTY);
                        try {
                            for (Map.Entry entry3 : createScanner) {
                                if (MetadataSchema.ReplicationSection.COLF.equals(((Key) entry3.getKey()).getColumnFamily())) {
                                    log.info(((Key) entry3.getKey()).toStringNoTruncate() + " " + ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry3.getValue()).get())));
                                } else {
                                    log.info(((Key) entry3.getKey()).toStringNoTruncate() + " " + entry3.getValue());
                                }
                            }
                            if (createScanner != null) {
                                createScanner.close();
                            }
                            log.info("");
                            log.info("Fetching replication records:");
                            scanner = ReplicationTable.getScanner(connector);
                            try {
                                for (Map.Entry entry4 : scanner) {
                                    log.info(((Key) entry4.getKey()).toStringNoTruncate() + " " + ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry4.getValue()).get())));
                                }
                                if (scanner != null) {
                                    scanner.close();
                                }
                                Scanner createScanner2 = connector.createScanner("master", Authorizations.EMPTY);
                                Scanner createScanner3 = connector2.createScanner("peer", Authorizations.EMPTY);
                                Iterator it2 = createScanner2.iterator();
                                Iterator it3 = createScanner3.iterator();
                                Map.Entry entry5 = null;
                                Map.Entry entry6 = null;
                                while (it2.hasNext() && it3.hasNext()) {
                                    entry5 = (Map.Entry) it2.next();
                                    entry6 = (Map.Entry) it3.next();
                                    Assert.assertEquals(entry5.getKey() + " was not equal to " + entry6.getKey(), 0L, ((Key) entry5.getKey()).compareTo((Key) entry6.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
                                    Assert.assertEquals(entry5.getValue(), entry6.getValue());
                                }
                                log.info("Last master entry: " + entry5);
                                log.info("Last peer entry: " + entry6);
                                Assert.assertFalse("Had more data to read from the master", it2.hasNext());
                                Assert.assertFalse("Had more data to read from the peer", it3.hasNext());
                                miniAccumuloClusterImpl.stop();
                            } finally {
                            }
                        } finally {
                        }
                    } catch (Throwable th) {
                        this.executor.shutdownNow();
                        throw th;
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th2) {
            miniAccumuloClusterImpl.stop();
            throw th2;
        }
    }

    @Test
    public void dataReplicatedToCorrectTable() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            Connector connector = getConnector();
            Connector connector2 = miniAccumuloClusterImpl.getConnector("root", new PasswordToken("testRootPassword1"));
            connector2.securityOperations().createLocalUser("peer", new PasswordToken("foo"));
            connector.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "peer");
            connector.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "foo");
            connector.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
            connector.tableOperations().create("master1");
            Assert.assertNotNull((String) connector.tableOperations().tableIdMap().get("master1"));
            connector.tableOperations().create("master2");
            Assert.assertNotNull((String) connector.tableOperations().tableIdMap().get("master2"));
            connector2.tableOperations().create("peer1");
            String str = (String) connector2.tableOperations().tableIdMap().get("peer1");
            Assert.assertNotNull(str);
            connector2.tableOperations().create("peer2");
            String str2 = (String) connector2.tableOperations().tableIdMap().get("peer2");
            Assert.assertNotNull(str2);
            connector2.securityOperations().grantTablePermission("peer", "peer1", TablePermission.WRITE);
            connector2.securityOperations().grantTablePermission("peer", "peer2", TablePermission.WRITE);
            connector.tableOperations().setProperty("master1", Property.TABLE_REPLICATION.getKey(), "true");
            connector.tableOperations().setProperty("master1", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
            connector.tableOperations().setProperty("master2", Property.TABLE_REPLICATION.getKey(), "true");
            connector.tableOperations().setProperty("master2", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str2);
            BatchWriter createBatchWriter = connector.createBatchWriter("master1", new BatchWriterConfig());
            long j = 0;
            for (int i = 0; i < 2500; i++) {
                Mutation mutation = new Mutation("master1" + i);
                for (int i2 = 0; i2 < 100; i2++) {
                    String num = Integer.toString(i2);
                    mutation.put(num, "", num);
                    j++;
                }
                createBatchWriter.addMutation(mutation);
            }
            createBatchWriter.close();
            BatchWriter createBatchWriter2 = connector.createBatchWriter("master2", new BatchWriterConfig());
            long j2 = 0;
            for (int i3 = 0; i3 < 2500; i3++) {
                Mutation mutation2 = new Mutation("master2" + i3);
                for (int i4 = 0; i4 < 100; i4++) {
                    String num2 = Integer.toString(i4);
                    mutation2.put(num2, "", num2);
                    j2++;
                }
                createBatchWriter2.addMutation(mutation2);
            }
            createBatchWriter2.close();
            log.info("Wrote all data to master cluster");
            Set referencedFiles = connector.replicationOperations().referencedFiles("master1");
            Set referencedFiles2 = connector.replicationOperations().referencedFiles("master2");
            log.info("Files to replicate for table1: " + referencedFiles);
            log.info("Files to replicate for table2: " + referencedFiles2);
            Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
            while (it.hasNext()) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            log.info("Restarted the tserver");
            Iterators.size(connector.createScanner("master1", Authorizations.EMPTY).iterator());
            while (!ReplicationTable.isOnline(connector)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            log.info("Waiting for {} for {}", referencedFiles, "master1");
            connector.replicationOperations().drain("master1", referencedFiles);
            log.info("Waiting for {} for {}", referencedFiles2, "master2");
            connector.replicationOperations().drain("master2", referencedFiles2);
            long j3 = 0;
            Scanner<Map.Entry> createScanner = connector2.createScanner("peer1", Authorizations.EMPTY);
            try {
                for (Map.Entry entry : createScanner) {
                    j3++;
                    Assert.assertTrue("Found unexpected key-value" + ((Key) entry.getKey()).toStringNoTruncate() + " " + entry.getValue(), ((Key) entry.getKey()).getRow().toString().startsWith("master1"));
                }
                if (createScanner != null) {
                    createScanner.close();
                }
                log.info("Found {} records in {}", Long.valueOf(j3), "peer1");
                Assert.assertEquals(j, j3);
                long j4 = 0;
                createScanner = connector2.createScanner("peer2", Authorizations.EMPTY);
                try {
                    for (Map.Entry entry2 : createScanner) {
                        j4++;
                        Assert.assertTrue("Found unexpected key-value" + ((Key) entry2.getKey()).toStringNoTruncate() + " " + entry2.getValue(), ((Key) entry2.getKey()).getRow().toString().startsWith("master2"));
                    }
                    if (createScanner != null) {
                        createScanner.close();
                    }
                    log.info("Found {} records in {}", Long.valueOf(j4), "peer2");
                    Assert.assertEquals(j2, j4);
                    miniAccumuloClusterImpl.stop();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            miniAccumuloClusterImpl.stop();
            throw th;
        }
    }

    @Test
    public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        Connector connector = getConnector();
        Connector connector2 = miniAccumuloClusterImpl.getConnector("root", new PasswordToken("testRootPassword1"));
        connector2.securityOperations().createLocalUser("repl", new PasswordToken("passwd"));
        connector.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
        connector.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "repl");
        connector.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "passwd");
        connector.tableOperations().create("master");
        Assert.assertNotNull((String) connector.tableOperations().tableIdMap().get("master"));
        connector2.tableOperations().create("peer");
        String str = (String) connector2.tableOperations().tableIdMap().get("peer");
        Assert.assertNotNull(str);
        connector2.securityOperations().grantTablePermission("repl", "peer", TablePermission.WRITE);
        connector.tableOperations().setProperty("master", Property.TABLE_REPLICATION.getKey(), "true");
        connector.tableOperations().setProperty("master", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
        BatchWriter createBatchWriter = connector.createBatchWriter("master", new BatchWriterConfig());
        for (int i = 0; i < 5000; i++) {
            Mutation mutation = new Mutation(Integer.toString(i));
            for (int i2 = 0; i2 < 100; i2++) {
                String num = Integer.toString(i2);
                mutation.put(num, "", num);
            }
            createBatchWriter.addMutation(mutation);
        }
        createBatchWriter.close();
        log.info("Wrote all data to master cluster");
        Set referencedFiles = connector.replicationOperations().referencedFiles("master");
        log.info("Files to replicate:" + referencedFiles);
        Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
        while (it.hasNext()) {
            this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
        }
        this.cluster.exec(TabletServer.class, new String[0]);
        while (!ReplicationTable.isOnline(connector)) {
            log.info("Replication table still offline, waiting");
            Thread.sleep(5000L);
        }
        Iterators.size(connector.createScanner("master", Authorizations.EMPTY).iterator());
        Scanner<Map.Entry> scanner = ReplicationTable.getScanner(connector);
        try {
            for (Map.Entry entry : scanner) {
                log.debug(((Key) entry.getKey()).toStringNoTruncate() + " " + ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry.getValue()).get())));
            }
            if (scanner != null) {
                scanner.close();
            }
            connector.replicationOperations().drain("master", referencedFiles);
            Scanner createScanner = connector.createScanner("master", Authorizations.EMPTY);
            Scanner createScanner2 = connector2.createScanner("peer", Authorizations.EMPTY);
            Iterator it2 = createScanner.iterator();
            Iterator it3 = createScanner2.iterator();
            while (it2.hasNext() && it3.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it2.next();
                Map.Entry entry3 = (Map.Entry) it3.next();
                Assert.assertEquals(entry3.getKey() + " was not equal to " + entry3.getKey(), 0L, ((Key) entry2.getKey()).compareTo((Key) entry3.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
                Assert.assertEquals(entry2.getValue(), entry3.getValue());
            }
            Assert.assertFalse("Had more data to read from the master", it2.hasNext());
            Assert.assertFalse("Had more data to read from the peer", it3.hasNext());
            miniAccumuloClusterImpl.stop();
        } catch (Throwable th) {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            Connector connector = getConnector();
            Connector connector2 = miniAccumuloClusterImpl.getConnector("root", new PasswordToken("testRootPassword1"));
            connector2.securityOperations().createLocalUser("repl", new PasswordToken("passwd"));
            connector.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "repl");
            connector.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "passwd");
            connector.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
            connector.tableOperations().create("master1");
            Assert.assertNotNull((String) connector.tableOperations().tableIdMap().get("master1"));
            connector.tableOperations().create("master2");
            Assert.assertNotNull((String) connector.tableOperations().tableIdMap().get("master2"));
            connector2.tableOperations().create("peer1");
            String str = (String) connector2.tableOperations().tableIdMap().get("peer1");
            Assert.assertNotNull(str);
            connector2.tableOperations().create("peer2");
            String str2 = (String) connector2.tableOperations().tableIdMap().get("peer2");
            Assert.assertNotNull(str2);
            connector2.securityOperations().grantTablePermission("repl", "peer1", TablePermission.WRITE);
            connector2.securityOperations().grantTablePermission("repl", "peer2", TablePermission.WRITE);
            connector.tableOperations().setProperty("master1", Property.TABLE_REPLICATION.getKey(), "true");
            connector.tableOperations().setProperty("master1", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
            connector.tableOperations().setProperty("master2", Property.TABLE_REPLICATION.getKey(), "true");
            connector.tableOperations().setProperty("master2", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str2);
            BatchWriter createBatchWriter = connector.createBatchWriter("master1", new BatchWriterConfig());
            for (int i = 0; i < 2500; i++) {
                Mutation mutation = new Mutation("master1" + i);
                for (int i2 = 0; i2 < 100; i2++) {
                    String num = Integer.toString(i2);
                    mutation.put(num, "", num);
                }
                createBatchWriter.addMutation(mutation);
            }
            createBatchWriter.close();
            BatchWriter createBatchWriter2 = connector.createBatchWriter("master2", new BatchWriterConfig());
            for (int i3 = 0; i3 < 2500; i3++) {
                Mutation mutation2 = new Mutation("master2" + i3);
                for (int i4 = 0; i4 < 100; i4++) {
                    String num2 = Integer.toString(i4);
                    mutation2.put(num2, "", num2);
                }
                createBatchWriter2.addMutation(mutation2);
            }
            createBatchWriter2.close();
            log.info("Wrote all data to master cluster");
            Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
            while (it.hasNext()) {
                this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
            }
            this.cluster.exec(TabletServer.class, new String[0]);
            while (!ReplicationTable.isOnline(connector)) {
                log.info("Replication table still offline, waiting");
                Thread.sleep(5000L);
            }
            boolean z = false;
            for (int i5 = 0; i5 < 10 && !z; i5++) {
                UtilWaitThread.sleepUninterruptibly(2L, TimeUnit.SECONDS);
                Scanner scanner = ReplicationTable.getScanner(connector);
                ReplicationSchema.WorkSection.limit(scanner);
                Iterator it2 = scanner.iterator();
                while (it2.hasNext()) {
                    if (StatusUtil.isFullyReplicated(Replication.Status.parseFrom(((Value) ((Map.Entry) it2.next()).getValue()).get()))) {
                        z |= true;
                    }
                }
            }
            Assert.assertNotEquals(0, Boolean.valueOf(z));
            long j = 0;
            for (int i6 = 0; i6 < 10; i6++) {
                for (Map.Entry entry : connector2.createScanner("peer1", Authorizations.EMPTY)) {
                    j++;
                    Assert.assertTrue("Found unexpected key-value" + ((Key) entry.getKey()).toStringNoTruncate() + " " + entry.getValue(), ((Key) entry.getKey()).getRow().toString().startsWith("master1"));
                }
                log.info("Found {} records in {}", Long.valueOf(j), "peer1");
                if (0 != j) {
                    break;
                }
                Thread.sleep(5000L);
            }
            Assert.assertTrue("Found no records in peer1 in the peer cluster", j > 0);
            for (int i7 = 0; i7 < 10; i7++) {
                j = 0;
                for (Map.Entry entry2 : connector2.createScanner("peer2", Authorizations.EMPTY)) {
                    j++;
                    Assert.assertTrue("Found unexpected key-value" + ((Key) entry2.getKey()).toStringNoTruncate() + " " + entry2.getValue(), ((Key) entry2.getKey()).getRow().toString().startsWith("master2"));
                }
                log.info("Found {} records in {}", Long.valueOf(j), "peer2");
                if (0 != j) {
                    break;
                }
                Thread.sleep(5000L);
            }
            Assert.assertTrue("Found no records in peer2 in the peer cluster", j > 0);
            miniAccumuloClusterImpl.stop();
        } catch (Throwable th) {
            miniAccumuloClusterImpl.stop();
            throw th;
        }
    }
}
