package org.apache.hadoop.hbase.io.asyncfs;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({MiscTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.class */
public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutputHang.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class);
    private static DistributedFileSystem FS;
    private static EventLoopGroup EVENT_LOOP_GROUP;
    private static Class<? extends Channel> CHANNEL_CLASS;
    private static StreamSlowMonitor MONITOR;
    private static FanOutOneBlockAsyncDFSOutput OUT;

    @Rule
    public TestName name = new TestName();

    @BeforeClass
    public static void setUp() throws Exception {
        startMiniDFSCluster(2);
        FS = CLUSTER.getFileSystem();
        EVENT_LOOP_GROUP = new NioEventLoopGroup();
        CHANNEL_CLASS = NioSocketChannel.class;
        MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
        OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/testHang"), true, false, (short) 2, FS.getDefaultBlockSize(), (EventLoopGroup) EVENT_LOOP_GROUP.next(), CHANNEL_CLASS, MONITOR, true);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (OUT != null) {
            OUT.recoverAndClose(null);
        }
        if (EVENT_LOOP_GROUP != null) {
            EVENT_LOOP_GROUP.shutdownGracefully().get();
        }
        shutdownMiniDFSCluster();
    }

    @Test
    public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception {
        MiniDFSCluster.DataNodeProperties dataNodeProperties = null;
        try {
            final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
            Map<Channel, DatanodeInfo> datanodeInfoMap = OUT.getDatanodeInfoMap();
            Iterator<Map.Entry<Channel, DatanodeInfo>> it = datanodeInfoMap.entrySet().iterator();
            Assert.assertTrue(it.hasNext());
            Map.Entry<Channel, DatanodeInfo> next = it.next();
            Channel key = next.getKey();
            DatanodeInfo value = next.getValue();
            ArrayList arrayList = new ArrayList();
            key.pipeline().forEach(entry -> {
                if (ProtobufDecoder.class.isInstance(entry.getValue())) {
                    arrayList.add((String) entry.getKey());
                }
            });
            Assert.assertTrue(arrayList.size() == 1);
            key.pipeline().addAfter((String) arrayList.get(0), "dn1AckReceivedHandler", new ChannelInboundHandlerAdapter() { // from class: org.apache.hadoop.hbase.io.asyncfs.TestFanOutOneBlockAsyncDFSOutputHang.1
                @Override // org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandler
                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    super.channelRead(channelHandlerContext, obj);
                    cyclicBarrier.await();
                }
            });
            Assert.assertTrue(it.hasNext());
            it.next().getKey().pipeline().addFirst(new ChannelInboundHandlerAdapter() { // from class: org.apache.hadoop.hbase.io.asyncfs.TestFanOutOneBlockAsyncDFSOutputHang.2
                @Override // org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandler
                public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                    if (obj instanceof ByteBuf) {
                        ((ByteBuf) obj).release();
                    } else {
                        channelHandlerContext.fireChannelRead(obj);
                    }
                }
            });
            byte[] bArr = new byte[10];
            Bytes.random(bArr);
            OUT.write(bArr, 0, bArr.length);
            CompletableFuture<Long> flush = OUT.flush(false);
            cyclicBarrier.await();
            dataNodeProperties = findAndKillFirstDataNode(value);
            Assert.assertTrue(dataNodeProperties != null);
            try {
                flush.get();
                Assert.fail();
            } catch (ExecutionException e) {
                Assert.assertTrue(e != null);
                LOG.info("expected exception caught when get future", e);
            }
            datanodeInfoMap.keySet().forEach(channel -> {
                try {
                    channel.closeFuture().get();
                } catch (InterruptedException | ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
            });
            if (dataNodeProperties != null) {
                CLUSTER.restartDataNode(dataNodeProperties);
            }
        } catch (Throwable th) {
            if (dataNodeProperties != null) {
                CLUSTER.restartDataNode(dataNodeProperties);
            }
            throw th;
        }
    }

    private static MiniDFSCluster.DataNodeProperties findAndKillFirstDataNode(DatanodeInfo datanodeInfo) {
        Assert.assertTrue(datanodeInfo != null);
        ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        Iterator<DataNode> it = dataNodes.iterator();
        while (it.hasNext()) {
            if (datanodeInfo.getXferAddr().equals(it.next().getDatanodeId().getXferAddr())) {
                arrayList.add(Integer.valueOf(i));
            }
            i++;
        }
        Assert.assertTrue(arrayList.size() == 1);
        return CLUSTER.stopDataNode(((Integer) arrayList.get(0)).intValue());
    }
}
