package org.apache.accumulo.test.replication;

import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.master.replication.SequentialWorkAssigner;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

@Ignore("Replication ITs are not stable and not currently maintained")
/* loaded from: input_file:org/apache/accumulo/test/replication/SequentialWorkAssignerIT.class */
public class SequentialWorkAssignerIT extends ConfigurableMacBase {
    private Connector conn;
    private MockSequentialWorkAssigner assigner;

    /* loaded from: input_file:org/apache/accumulo/test/replication/SequentialWorkAssignerIT$MockSequentialWorkAssigner.class */
    private static class MockSequentialWorkAssigner extends SequentialWorkAssigner {
        public MockSequentialWorkAssigner(Connector connector) {
            super((AccumuloConfiguration) null, connector);
        }

        public void setConnector(Connector connector) {
            super.setConnector(connector);
        }

        public void setQueuedWork(Map<String, Map<String, String>> map) {
            super.setQueuedWork(map);
        }

        public void setWorkQueue(DistributedWorkQueue distributedWorkQueue) {
            super.setWorkQueue(distributedWorkQueue);
        }

        public void setMaxQueueSize(int i) {
            super.setMaxQueueSize(i);
        }

        public void createWork() {
            super.createWork();
        }

        public void setZooCache(ZooCache zooCache) {
            super.setZooCache(zooCache);
        }

        public void cleanupFinishedWork() {
            super.cleanupFinishedWork();
        }
    }

    @Before
    public void init() throws Exception {
        this.conn = getConnector();
        this.assigner = new MockSequentialWorkAssigner(this.conn);
        this.conn.securityOperations().grantTablePermission(this.conn.whoami(), "accumulo.replication", TablePermission.READ);
        this.conn.securityOperations().grantTablePermission(this.conn.whoami(), "accumulo.replication", TablePermission.WRITE);
        ReplicationTable.setOnline(this.conn);
    }

    @Test
    public void createWorkForFilesInCorrectOrder() throws Exception {
        ReplicationTarget replicationTarget = new ReplicationTarget("cluster1", "table1", "1");
        Text text = replicationTarget.toText();
        BatchWriter batchWriter = ReplicationTable.getBatchWriter(this.conn);
        String str = "/accumulo/wal/tserver+port/z_file1";
        String str2 = "/accumulo/wal/tserver+port/a_file1";
        Replication.Status build = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(250L).build();
        Replication.Status build2 = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(500L).build();
        Mutation mutation = new Mutation(str);
        ReplicationSchema.WorkSection.add(mutation, text, ProtobufUtil.toValue(build));
        batchWriter.addMutation(mutation);
        Mutation mutation2 = new Mutation(str2);
        ReplicationSchema.WorkSection.add(mutation2, text, ProtobufUtil.toValue(build2));
        batchWriter.addMutation(mutation2);
        Mutation createMutation = ReplicationSchema.OrderSection.createMutation(str, build.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation, replicationTarget.getSourceTableId(), ProtobufUtil.toValue(build));
        batchWriter.addMutation(createMutation);
        Mutation createMutation2 = ReplicationSchema.OrderSection.createMutation(str2, build2.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation2, replicationTarget.getSourceTableId(), ProtobufUtil.toValue(build2));
        batchWriter.addMutation(createMutation2);
        batchWriter.close();
        DistributedWorkQueue distributedWorkQueue = (DistributedWorkQueue) EasyMock.createMock(DistributedWorkQueue.class);
        HashMap hashMap = new HashMap();
        this.assigner.setQueuedWork(hashMap);
        this.assigner.setWorkQueue(distributedWorkQueue);
        this.assigner.setMaxQueueSize(Integer.MAX_VALUE);
        distributedWorkQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget), str);
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{distributedWorkQueue});
        this.assigner.createWork();
        EasyMock.verify(new Object[]{distributedWorkQueue});
        Assert.assertEquals(1L, hashMap.size());
        Assert.assertTrue(hashMap.containsKey("cluster1"));
        Map map = (Map) hashMap.get("cluster1");
        Assert.assertEquals(1L, map.size());
        Assert.assertTrue(map.containsKey(replicationTarget.getSourceTableId()));
        Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget), map.get(replicationTarget.getSourceTableId()));
    }

    @Test
    public void workAcrossTablesHappensConcurrently() throws Exception {
        ReplicationTarget replicationTarget = new ReplicationTarget("cluster1", "table1", "1");
        Text text = replicationTarget.toText();
        ReplicationTarget replicationTarget2 = new ReplicationTarget("cluster1", "table2", "2");
        Text text2 = replicationTarget2.toText();
        BatchWriter batchWriter = ReplicationTable.getBatchWriter(this.conn);
        String str = "/accumulo/wal/tserver+port/z_file1";
        String str2 = "/accumulo/wal/tserver+port/a_file1";
        Replication.Status build = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(250L).build();
        Replication.Status build2 = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(500L).build();
        Mutation mutation = new Mutation(str);
        ReplicationSchema.WorkSection.add(mutation, text, ProtobufUtil.toValue(build));
        batchWriter.addMutation(mutation);
        Mutation mutation2 = new Mutation(str2);
        ReplicationSchema.WorkSection.add(mutation2, text2, ProtobufUtil.toValue(build2));
        batchWriter.addMutation(mutation2);
        Mutation createMutation = ReplicationSchema.OrderSection.createMutation(str, build.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation, replicationTarget.getSourceTableId(), ProtobufUtil.toValue(build));
        batchWriter.addMutation(createMutation);
        Mutation createMutation2 = ReplicationSchema.OrderSection.createMutation(str2, build2.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation2, replicationTarget2.getSourceTableId(), ProtobufUtil.toValue(build2));
        batchWriter.addMutation(createMutation2);
        batchWriter.close();
        DistributedWorkQueue distributedWorkQueue = (DistributedWorkQueue) EasyMock.createMock(DistributedWorkQueue.class);
        HashMap hashMap = new HashMap();
        this.assigner.setQueuedWork(hashMap);
        this.assigner.setWorkQueue(distributedWorkQueue);
        this.assigner.setMaxQueueSize(Integer.MAX_VALUE);
        distributedWorkQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget), str);
        EasyMock.expectLastCall().once();
        distributedWorkQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey("a_file1", replicationTarget2), str2);
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{distributedWorkQueue});
        this.assigner.createWork();
        EasyMock.verify(new Object[]{distributedWorkQueue});
        Assert.assertEquals(1L, hashMap.size());
        Assert.assertTrue(hashMap.containsKey("cluster1"));
        Map map = (Map) hashMap.get("cluster1");
        Assert.assertEquals(2L, map.size());
        Assert.assertTrue(map.containsKey(replicationTarget.getSourceTableId()));
        Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget), map.get(replicationTarget.getSourceTableId()));
        Assert.assertTrue(map.containsKey(replicationTarget2.getSourceTableId()));
        Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("a_file1", replicationTarget2), map.get(replicationTarget2.getSourceTableId()));
    }

    @Test
    public void workAcrossPeersHappensConcurrently() throws Exception {
        ReplicationTarget replicationTarget = new ReplicationTarget("cluster1", "table1", "1");
        Text text = replicationTarget.toText();
        ReplicationTarget replicationTarget2 = new ReplicationTarget("cluster2", "table1", "1");
        Text text2 = replicationTarget2.toText();
        BatchWriter batchWriter = ReplicationTable.getBatchWriter(this.conn);
        String str = "/accumulo/wal/tserver+port/z_file1";
        String str2 = "/accumulo/wal/tserver+port/a_file1";
        Replication.Status build = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(250L).build();
        Replication.Status build2 = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(500L).build();
        Mutation mutation = new Mutation(str);
        ReplicationSchema.WorkSection.add(mutation, text, ProtobufUtil.toValue(build));
        batchWriter.addMutation(mutation);
        Mutation mutation2 = new Mutation(str2);
        ReplicationSchema.WorkSection.add(mutation2, text2, ProtobufUtil.toValue(build2));
        batchWriter.addMutation(mutation2);
        Mutation createMutation = ReplicationSchema.OrderSection.createMutation(str, build.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation, replicationTarget.getSourceTableId(), ProtobufUtil.toValue(build));
        batchWriter.addMutation(createMutation);
        Mutation createMutation2 = ReplicationSchema.OrderSection.createMutation(str2, build2.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation2, replicationTarget2.getSourceTableId(), ProtobufUtil.toValue(build2));
        batchWriter.addMutation(createMutation2);
        batchWriter.close();
        DistributedWorkQueue distributedWorkQueue = (DistributedWorkQueue) EasyMock.createMock(DistributedWorkQueue.class);
        HashMap hashMap = new HashMap();
        this.assigner.setQueuedWork(hashMap);
        this.assigner.setWorkQueue(distributedWorkQueue);
        this.assigner.setMaxQueueSize(Integer.MAX_VALUE);
        distributedWorkQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget), str);
        EasyMock.expectLastCall().once();
        distributedWorkQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey("a_file1", replicationTarget2), str2);
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{distributedWorkQueue});
        this.assigner.createWork();
        EasyMock.verify(new Object[]{distributedWorkQueue});
        Assert.assertEquals(2L, hashMap.size());
        Assert.assertTrue(hashMap.containsKey("cluster1"));
        Map map = (Map) hashMap.get("cluster1");
        Assert.assertEquals(1L, map.size());
        Assert.assertTrue(map.containsKey(replicationTarget.getSourceTableId()));
        Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget), map.get(replicationTarget.getSourceTableId()));
        Map map2 = (Map) hashMap.get("cluster2");
        Assert.assertEquals(1L, map2.size());
        Assert.assertTrue(map2.containsKey(replicationTarget2.getSourceTableId()));
        Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("a_file1", replicationTarget2), map2.get(replicationTarget2.getSourceTableId()));
    }

    @Test
    public void reprocessingOfCompletedWorkRemovesWork() throws Exception {
        ReplicationTarget replicationTarget = new ReplicationTarget("cluster1", "table1", "1");
        Text text = replicationTarget.toText();
        BatchWriter batchWriter = ReplicationTable.getBatchWriter(this.conn);
        String str = "/accumulo/wal/tserver+port/z_file1";
        String str2 = "/accumulo/wal/tserver+port/a_file1";
        Replication.Status build = Replication.Status.newBuilder().setBegin(100L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(250L).build();
        Replication.Status build2 = Replication.Status.newBuilder().setBegin(0L).setEnd(100L).setClosed(true).setInfiniteEnd(false).setCreatedTime(500L).build();
        Mutation mutation = new Mutation(str);
        ReplicationSchema.WorkSection.add(mutation, text, ProtobufUtil.toValue(build));
        batchWriter.addMutation(mutation);
        Mutation mutation2 = new Mutation(str2);
        ReplicationSchema.WorkSection.add(mutation2, text, ProtobufUtil.toValue(build2));
        batchWriter.addMutation(mutation2);
        Mutation createMutation = ReplicationSchema.OrderSection.createMutation(str, build.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation, replicationTarget.getSourceTableId(), ProtobufUtil.toValue(build));
        batchWriter.addMutation(createMutation);
        Mutation createMutation2 = ReplicationSchema.OrderSection.createMutation(str2, build2.getCreatedTime());
        ReplicationSchema.OrderSection.add(createMutation2, replicationTarget.getSourceTableId(), ProtobufUtil.toValue(build2));
        batchWriter.addMutation(createMutation2);
        batchWriter.close();
        DistributedWorkQueue distributedWorkQueue = (DistributedWorkQueue) EasyMock.createMock(DistributedWorkQueue.class);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(replicationTarget.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey("z_file1", replicationTarget));
        hashMap.put("cluster1", hashMap2);
        this.assigner.setQueuedWork(hashMap);
        this.assigner.setWorkQueue(distributedWorkQueue);
        this.assigner.setMaxQueueSize(Integer.MAX_VALUE);
        distributedWorkQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey("a_file1", replicationTarget), str2);
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{distributedWorkQueue});
        this.assigner.createWork();
        EasyMock.verify(new Object[]{distributedWorkQueue});
        Assert.assertEquals(1L, hashMap.size());
        Assert.assertTrue(hashMap.containsKey("cluster1"));
        Map map = (Map) hashMap.get("cluster1");
        Assert.assertEquals(1L, map.size());
        Assert.assertTrue(map.containsKey(replicationTarget.getSourceTableId()));
        Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("a_file1", replicationTarget), map.get(replicationTarget.getSourceTableId()));
    }
}
