package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.tools.TestCommandShell;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({RPCTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.class */
public class TestNettyChannelWritability {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestNettyChannelWritability.class);
    private static final MetricsAssertHelper METRICS_ASSERT = (MetricsAssertHelper) CompatibilityFactory.getInstance(MetricsAssertHelper.class);
    private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
    private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);

    @Test
    public void testNettyWritableWatermarks() throws Exception {
        Configuration create = HBaseConfiguration.create();
        create.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1);
        create.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2);
        NettyRpcServer createRpcServer = createRpcServer(create, 0);
        try {
            sendAndReceive(create, createRpcServer, 5);
            METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0L, createRpcServer.metrics.getMetricsSource());
        } finally {
            createRpcServer.stop();
        }
    }

    @Test
    public void testNettyWritableFatalThreshold() throws Exception {
        Configuration create = HBaseConfiguration.create();
        create.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1);
        NettyRpcServer createRpcServer = createRpcServer(create, 3);
        try {
            Assert.assertTrue(((CompletionException) Assert.assertThrows(CompletionException.class, () -> {
                sendAndReceive(create, createRpcServer, 5);
            })).getCause().getCause() instanceof ServiceException);
            METRICS_ASSERT.assertCounterGt(MetricsHBaseServerSource.MAX_OUTBOUND_BYTES_EXCEEDED_NAME, 0L, createRpcServer.metrics.getMetricsSource());
            createRpcServer.stop();
        } catch (Throwable th) {
            createRpcServer.stop();
            throw th;
        }
    }

    private void sendAndReceive(Configuration configuration, NettyRpcServer nettyRpcServer, int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 3; i2++) {
            arrayList.add(CELL);
        }
        NettyRpcClient nettyRpcClient = new NettyRpcClient(configuration);
        try {
            nettyRpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface newBlockingStub = TestProtobufRpcServiceImpl.newBlockingStub(nettyRpcClient, nettyRpcServer.getListenerAddress());
            CompletableFuture[] completableFutureArr = new CompletableFuture[i];
            for (int i3 = 0; i3 < i; i3++) {
                completableFutureArr[i3] = CompletableFuture.runAsync(() -> {
                    try {
                        sendMessage(arrayList, newBlockingStub);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            CompletableFuture.allOf(completableFutureArr).join();
            nettyRpcClient.close();
        } catch (Throwable th) {
            try {
                nettyRpcClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void sendMessage(List<Cell> list, TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface blockingInterface) throws Exception {
        HBaseRpcControllerImpl hBaseRpcControllerImpl = new HBaseRpcControllerImpl(CellUtil.createCellScanner((Iterable<Cell>) list));
        Assert.assertEquals(TestCommandShell.Example.HELLO, blockingInterface.echo(hBaseRpcControllerImpl, TestProtos.EchoRequestProto.newBuilder().setMessage(TestCommandShell.Example.HELLO).build()).getMessage());
        int i = 0;
        CellScanner cellScanner = hBaseRpcControllerImpl.cellScanner();
        Assert.assertNotNull(cellScanner);
        while (cellScanner.advance()) {
            Assert.assertEquals(CELL, cellScanner.current());
            i++;
        }
        Assert.assertEquals(list.size(), i);
    }

    private NettyRpcServer createRpcServer(Configuration configuration, final int i) throws IOException {
        ArrayList newArrayList = Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null));
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 0);
        FifoRpcScheduler fifoRpcScheduler = new FifoRpcScheduler(configuration, 1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        return new NettyRpcServer(null, "testRpcServer", newArrayList, inetSocketAddress, configuration, fifoRpcScheduler, true) { // from class: org.apache.hadoop.hbase.ipc.TestNettyChannelWritability.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.hadoop.hbase.ipc.NettyRpcServer
            public NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
                return new NettyServerRpcConnection(this, channel) { // from class: org.apache.hadoop.hbase.ipc.TestNettyChannelWritability.1.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    @Override // org.apache.hadoop.hbase.ipc.NettyServerRpcConnection, org.apache.hadoop.hbase.ipc.ServerRpcConnection
                    public void doRespond(RpcResponse rpcResponse) {
                        if (atomicInteger.incrementAndGet() >= i) {
                            super.doRespond(rpcResponse);
                        } else {
                            this.channel.write(rpcResponse);
                        }
                    }
                };
            }
        };
    }
}
