package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncAdminBuilder;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry;
import org.apache.hadoop.hbase.client.DummyAsyncTable;
import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, SmallTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.class */
public class TestWALEntrySinkFilter {

    @Rule
    public TestName name = new TestName();
    static final int BOUNDARY = 5;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
    static final AtomicInteger UNFILTERED = new AtomicInteger();
    static final AtomicInteger FILTERED = new AtomicInteger();
    private static Stoppable STOPPABLE = new Stoppable() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntrySinkFilter.1
        private final AtomicBoolean stop = new AtomicBoolean(false);

        @Override // org.apache.hadoop.hbase.Stoppable
        public boolean isStopped() {
            return this.stop.get();
        }

        @Override // org.apache.hadoop.hbase.Stoppable
        public void stop(String str) {
            TestWALEntrySinkFilter.LOG.info("STOPPING BECAUSE: " + str);
            this.stop.set(true);
        }
    };

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter$DevNullAsyncConnection.class */
    public static class DevNullAsyncConnection implements AsyncConnection {
        private final Configuration conf;

        public DevNullAsyncConnection(Configuration configuration, ConnectionRegistry connectionRegistry, String str, User user, Map<String, byte[]> map) {
            this.conf = configuration;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public void clearRegionLocationCache() {
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, ExecutorService executorService) {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncAdminBuilder getAdminBuilder() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncAdminBuilder getAdminBuilder(ExecutorService executorService) {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService executorService) {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public CompletableFuture<Hbck> getHbck() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public Hbck getHbck(ServerName serverName) throws IOException {
            return null;
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public boolean isClosed() {
            return false;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
            return new DummyAsyncTable<AdvancedScanResultConsumer>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntrySinkFilter.DevNullAsyncConnection.1
                @Override // org.apache.hadoop.hbase.client.AsyncTable
                public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> list) {
                    ArrayList arrayList = new ArrayList(list.size());
                    Iterator<? extends Row> it = list.iterator();
                    while (it.hasNext()) {
                        int i = Bytes.toInt(it.next().getRow());
                        Assert.assertTrue(i, i > 5);
                        TestWALEntrySinkFilter.UNFILTERED.incrementAndGet();
                        arrayList.add(null);
                    }
                    return CompletableFuture.completedFuture(arrayList);
                }
            };
        }

        @Override // org.apache.hadoop.hbase.client.AsyncConnection
        public Configuration getConfiguration() {
            return this.conf;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter$DevNullConnectionRegistry.class */
    public static class DevNullConnectionRegistry extends DoNothingConnectionRegistry {
        public DevNullConnectionRegistry(Configuration configuration, User user) {
            super(configuration, user);
        }

        public CompletableFuture<String> getClusterId() {
            return CompletableFuture.completedFuture("test");
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter$IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class */
    public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter {
        @Override // org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter
        public void init(Connection connection) {
        }

        @Override // org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter
        public boolean filter(TableName tableName, long j) {
            boolean z = j <= 5;
            if (z) {
                TestWALEntrySinkFilter.FILTERED.incrementAndGet();
            }
            return z;
        }
    }

    @Test
    public void testWALEntryFilter() throws IOException {
        Configuration create = HBaseConfiguration.create();
        create.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class, ConnectionRegistry.class);
        create.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
        create.setClass(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL, DevNullAsyncConnection.class, AsyncConnection.class);
        ReplicationSink replicationSink = new ReplicationSink(create, null);
        ArrayList arrayList = new ArrayList();
        AdminProtos.WALEntry.Builder newBuilder = AdminProtos.WALEntry.newBuilder();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
        final ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 10; i++) {
            byte[] bytes = Bytes.toBytes(i);
            newBuilder.clear();
            newBuilder.setKey(newBuilder.getKeyBuilder().setLogSequenceNumber(i).setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(copyFromUtf8).build());
            newBuilder.setAssociatedCellCount(1);
            arrayList.add(newBuilder.build());
            arrayList2.add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(bytes).setFamily(bytes).setQualifier(bytes).setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build());
        }
        replicationSink.replicateEntries(arrayList, new CellScanner() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntrySinkFilter.2
            int index = -1;

            @Override // org.apache.hadoop.hbase.CellScanner
            public Cell current() {
                return (Cell) arrayList2.get(this.index);
            }

            @Override // org.apache.hadoop.hbase.CellScanner
            public boolean advance() throws IOException {
                this.index++;
                return this.index < arrayList2.size();
            }
        }, null, null, null);
        Assert.assertTrue(FILTERED.get() > 0);
        Assert.assertTrue(UNFILTERED.get() > 0);
        Assert.assertEquals(10, FILTERED.get() + UNFILTERED.get());
    }
}
