package org.apache.hadoop.hdfs;

import com.google.common.base.Supplier;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.class */
public class TestClientProtocolForPipelineRecovery {
    private static final Logger LOG;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Test
    public void testGetNewStamp() throws IOException {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(1).build();
        try {
            build.waitActive();
            DistributedFileSystem fileSystem = build.getFileSystem();
            NamenodeProtocols nameNodeRpc = build.getNameNodeRpc();
            Path path = new Path("dataprotocol.dat");
            DFSTestUtil.createFile(fileSystem, path, 1L, (short) 1, 0L);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSystem, path);
            try {
                nameNodeRpc.updateBlockForPipeline(firstBlock, "");
                Assert.fail("Can not get a new GS from a finalized block");
            } catch (IOException e) {
                Assert.assertTrue(e.getMessage().contains("not " + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION));
            }
            try {
                nameNodeRpc.updateBlockForPipeline(new ExtendedBlock(firstBlock.getBlockPoolId(), firstBlock.getBlockId() + 1, 0L, firstBlock.getGenerationStamp()), "");
                Assert.fail("Cannot get a new GS from a non-existent block");
            } catch (IOException e2) {
                Assert.assertTrue(e2.getMessage().contains("does not exist"));
            }
            try {
                DFSOutputStream wrappedStream = fileSystem.append(path).getWrappedStream();
                wrappedStream.write(1);
                wrappedStream.hflush();
                FSDataInputStream fSDataInputStream = null;
                try {
                    fSDataInputStream = fileSystem.open(path);
                    ExtendedBlock block = DFSTestUtil.getAllBlocks(fSDataInputStream).get(0).getBlock();
                    IOUtils.closeStream(fSDataInputStream);
                    DFSClient dFSClient = fileSystem.dfs;
                    try {
                        nameNodeRpc.updateBlockForPipeline(block, "test" + dFSClient.clientName);
                        Assert.fail("Cannot get a new GS for a non lease holder");
                    } catch (LeaseExpiredException e3) {
                        Assert.assertTrue(e3.getMessage().startsWith("Lease mismatch"));
                    }
                    try {
                        nameNodeRpc.updateBlockForPipeline(block, (String) null);
                        Assert.fail("Cannot get a new GS for a null lease holder");
                    } catch (LeaseExpiredException e4) {
                        Assert.assertTrue(e4.getMessage().startsWith("Lease mismatch"));
                    }
                    nameNodeRpc.updateBlockForPipeline(block, dFSClient.clientName);
                    IOUtils.closeStream(wrappedStream);
                } catch (Throwable th) {
                    IOUtils.closeStream(fSDataInputStream);
                    throw th;
                }
            } catch (Throwable th2) {
                IOUtils.closeStream((Closeable) null);
                throw th2;
            }
        } finally {
            build.shutdown();
        }
    }

    @Test
    public void testPipelineRecoveryForLastBlock() throws IOException {
        DFSClientFaultInjector dFSClientFaultInjector = (DFSClientFaultInjector) Mockito.mock(DFSClientFaultInjector.class);
        DFSClientFaultInjector dFSClientFaultInjector2 = DFSClientFaultInjector.get();
        DFSClientFaultInjector.set(dFSClientFaultInjector);
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setInt("dfs.client.block.write.locateFollowingBlock.retries", 3);
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(3).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path("dataprotocol1.dat");
            Mockito.when(Boolean.valueOf(dFSClientFaultInjector.failPacket())).thenReturn(true);
            DFSTestUtil.createFile(fileSystem, path, 68000000L, (short) 3, 0L);
            try {
                fileSystem.open(path).read();
            } catch (BlockMissingException e) {
                Assert.fail("Block is missing because the file was closed with corrupt replicas.");
            }
            DFSClientFaultInjector.set(dFSClientFaultInjector2);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            DFSClientFaultInjector.set(dFSClientFaultInjector2);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testPacketTransmissionDelay() throws Exception {
        DataNodeFaultInjector dataNodeFaultInjector = new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.1
            public boolean dropHeartbeatPacket() {
                return true;
            }
        };
        DataNodeFaultInjector dataNodeFaultInjector2 = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set(dataNodeFaultInjector);
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.set("dfs.client.socket-timeout", "3000");
        MiniDFSCluster miniDFSCluster = null;
        try {
            MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(2).build();
            build.waitActive();
            FSDataOutputStream create = build.getFileSystem().create(new Path("noheartbeat.dat"), (short) 2);
            create.write(49);
            create.hflush();
            DFSOutputStream wrappedStream = create.getWrappedStream();
            DatanodeInfo[] pipeline = wrappedStream.getPipeline();
            Thread.sleep(3500L);
            create.write(50);
            create.hflush();
            DatanodeInfo[] pipeline2 = wrappedStream.getPipeline();
            create.close();
            boolean z = false;
            for (int i = 0; i < pipeline2.length; i++) {
                if (pipeline[0].getXferAddr().equals(pipeline2[i].getXferAddr())) {
                    throw new IOException("The first datanode should have been replaced.");
                }
                if (pipeline[1].getXferAddr().equals(pipeline2[i].getXferAddr())) {
                    z = true;
                }
            }
            Assert.assertTrue(z);
            DataNodeFaultInjector.set(dataNodeFaultInjector2);
            if (build != null) {
                build.shutdown();
            }
        } catch (Throwable th) {
            DataNodeFaultInjector.set(dataNodeFaultInjector2);
            if (0 != 0) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testPipelineRecoveryOnOOB() throws Exception {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.set("dfs.client.datanode-restart.timeout", "15");
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path("dataprotocol2.dat");
            DFSTestUtil.createFile(fileSystem, path, 10240L, (short) 1, 0L);
            DFSOutputStream wrappedStream = fileSystem.append(path).getWrappedStream();
            wrappedStream.write(1);
            wrappedStream.hflush();
            Assert.assertEquals(0L, new DFSAdmin(hdfsConfiguration).run(new String[]{"-shutdownDatanode", miniDFSCluster.getDataNodes().get(0).getDatanodeId().getIpcAddr(false), "upgrade"}));
            GenericTestUtils.waitForThreadTermination("Async datanode shutdown thread", 100, 10000);
            miniDFSCluster.restartDataNode(0, true);
            wrappedStream.close();
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testEvictWriter() throws Exception {
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(3).build();
            miniDFSCluster.waitActive();
            FSDataOutputStream create = miniDFSCluster.getFileSystem().create(new Path("testEvictWriter.dat"), (short) 2);
            create.write(49);
            create.hflush();
            DFSOutputStream wrappedStream = create.getWrappedStream();
            DatanodeInfo[] pipeline = wrappedStream.getPipeline();
            Assert.assertEquals(2L, pipeline.length);
            String ipcAddr = pipeline[1].getIpcAddr(false);
            Assert.assertEquals(0L, new DFSAdmin(r0).run(new String[]{"-evictWriters", ipcAddr}));
            create.write(49);
            create.hflush();
            DatanodeInfo[] pipeline2 = wrappedStream.getPipeline();
            try {
                Assert.assertTrue(pipeline2.length > 0);
                for (DatanodeInfo datanodeInfo : pipeline2) {
                    Assert.assertFalse(ipcAddr.equals(datanodeInfo.getIpcAddr(false)));
                }
                create.close();
                if (miniDFSCluster != null) {
                    miniDFSCluster.shutdown();
                }
            } catch (Throwable th) {
                create.close();
                throw th;
            }
        } catch (Throwable th2) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th2;
        }
    }

    @Test
    public void testPipelineRecoveryOnRestartFailure() throws Exception {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.set("dfs.client.datanode-restart.timeout", "5");
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(2).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path("dataprotocol3.dat");
            DFSTestUtil.createFile(fileSystem, path, 10240L, (short) 2, 0L);
            DFSOutputStream wrappedStream = fileSystem.append(path).getWrappedStream();
            wrappedStream.write(1);
            wrappedStream.hflush();
            DFSAdmin dFSAdmin = new DFSAdmin(hdfsConfiguration);
            Assert.assertEquals(0L, dFSAdmin.run(new String[]{"-shutdownDatanode", miniDFSCluster.getDataNodes().get(0).getDatanodeId().getIpcAddr(false), "upgrade"}));
            GenericTestUtils.waitForThreadTermination("Async datanode shutdown thread", 100, 10000);
            wrappedStream.close();
            DFSOutputStream wrappedStream2 = fileSystem.append(path).getWrappedStream();
            wrappedStream2.write(1);
            wrappedStream2.hflush();
            Assert.assertEquals(0L, dFSAdmin.run(new String[]{"-shutdownDatanode", miniDFSCluster.getDataNodes().get(1).getDatanodeId().getIpcAddr(false), "upgrade"}));
            GenericTestUtils.waitForThreadTermination("Async datanode shutdown thread", 100, 10000);
            try {
                wrappedStream2.close();
            } catch (IOException e) {
            }
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(2).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
            DFSTestUtil.createFile(fileSystem, path, 10240L, (short) 2, 0L);
            final DFSOutputStream wrappedStream = fileSystem.append(path).getWrappedStream();
            wrappedStream.write(1);
            wrappedStream.hflush();
            final long generationStamp = wrappedStream.getBlock().getGenerationStamp();
            MiniDFSCluster.DataNodeProperties stopDataNodeForUpgrade = miniDFSCluster.stopDataNodeForUpgrade(0);
            GenericTestUtils.waitForThreadTermination("Async datanode shutdown thread", 100, 10000);
            miniDFSCluster.restartDataNode(stopDataNodeForUpgrade, true);
            miniDFSCluster.waitActive();
            GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.2
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Boolean m62get() {
                    return Boolean.valueOf(wrappedStream.getBlock().getGenerationStamp() > generationStamp);
                }
            }, 100, 10000);
            Assert.assertEquals("The pipeline recovery count shouldn't increase", 0L, wrappedStream.getStreamer().getPipelineRecoveryCount());
            wrappedStream.write(1);
            wrappedStream.close();
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setBoolean("dfs.client.block.write.replace-datanode-on-failure.best-effort", true);
        MiniDFSCluster miniDFSCluster = null;
        DFSClientFaultInjector dFSClientFaultInjector = DFSClientFaultInjector.get();
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(3).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            Path path = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
            DFSTestUtil.createFile(fileSystem, path, 10240L, (short) 3, 0L);
            DFSClientFaultInjector.set(new DFSClientFaultInjector() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.3
                public boolean skipRollingRestartWait() {
                    return true;
                }
            });
            final DFSOutputStream wrappedStream = fileSystem.append(path).getWrappedStream();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
            Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.4
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    while (atomicBoolean.get()) {
                        try {
                            wrappedStream.write("test".getBytes());
                            wrappedStream.hflush();
                            Thread.sleep(1000L);
                        } catch (IOException | InterruptedException e) {
                            TestClientProtocolForPipelineRecovery.LOG.error("Exception during write", e);
                            atomicBoolean2.set(true);
                        }
                    }
                    atomicBoolean.set(false);
                }
            };
            thread.start();
            Thread.sleep(1000L);
            for (DatanodeInfo datanodeInfo : wrappedStream.getPipeline()) {
                Assert.assertFalse("Write should be going on", atomicBoolean2.get());
                ArrayList<DataNode> dataNodes = miniDFSCluster.getDataNodes();
                int i = 0;
                int i2 = 0;
                while (true) {
                    if (i2 >= dataNodes.size()) {
                        break;
                    }
                    if (dataNodes.get(i2).getIpcPort() == datanodeInfo.getIpcPort()) {
                        i = i2;
                        break;
                    }
                    i2++;
                }
                final long generationStamp = wrappedStream.getBlock().getGenerationStamp();
                MiniDFSCluster.DataNodeProperties stopDataNodeForUpgrade = miniDFSCluster.stopDataNodeForUpgrade(i);
                GenericTestUtils.waitForThreadTermination("Async datanode shutdown thread", 100, 10000);
                miniDFSCluster.restartDataNode(stopDataNodeForUpgrade, true);
                miniDFSCluster.waitActive();
                GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.5
                    /* renamed from: get, reason: merged with bridge method [inline-methods] */
                    public Boolean m63get() {
                        return Boolean.valueOf(wrappedStream.getBlock().getGenerationStamp() > generationStamp);
                    }
                }, 100, 10000);
                Assert.assertEquals("The pipeline recovery count shouldn't increase", 0L, wrappedStream.getStreamer().getPipelineRecoveryCount());
            }
            Assert.assertFalse("Write should be going on", atomicBoolean2.get());
            atomicBoolean.set(false);
            thread.join();
            wrappedStream.write("testagain".getBytes());
            Assert.assertTrue("There should be atleast 2 nodes in pipeline still", wrappedStream.getPipeline().length >= 2);
            wrappedStream.close();
            DFSClientFaultInjector.set(dFSClientFaultInjector);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            DFSClientFaultInjector.set(dFSClientFaultInjector);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testZeroByteBlockRecovery() throws Exception {
        DataNodeFaultInjector dataNodeFaultInjector = new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.6
            int tries = 1;

            public void stopSendingPacketDownstream(String str) throws IOException {
                if (this.tries > 0) {
                    this.tries--;
                    try {
                        Thread.sleep(60000L);
                    } catch (InterruptedException e) {
                        throw new IOException("Interrupted while sleeping. Bailing out.");
                    }
                }
            }
        };
        DataNodeFaultInjector dataNodeFaultInjector2 = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set(dataNodeFaultInjector);
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.set("dfs.client.socket-timeout", "1000");
        hdfsConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "ALWAYS");
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(3).build();
            miniDFSCluster.waitActive();
            FSDataOutputStream create = miniDFSCluster.getFileSystem().create(new Path("noheartbeat.dat"), (short) 2);
            create.write(49);
            create.hflush();
            create.close();
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            DataNodeFaultInjector.set(dataNodeFaultInjector2);
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            DataNodeFaultInjector.set(dataNodeFaultInjector2);
            throw th;
        }
    }

    @Test
    public void testPipelineRecoveryWithTransferBlock() throws Exception {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(4).build();
        DataNodeFaultInjector dataNodeFaultInjector = DataNodeFaultInjector.get();
        try {
            DistributedFileSystem fileSystem = build.getFileSystem();
            Path path = new Path("/f");
            FSDataOutputStream create = fileSystem.create(path);
            int i = 0;
            create.writeBytes("hello");
            create.hflush();
            DFSOutputStream wrappedStream = create.getWrappedStream();
            DatanodeInfo[] nodes = wrappedStream.getStreamer().getNodes();
            final String xferAddr = nodes[2].getXferAddr(false);
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            DataNodeFaultInjector.set(new DataNodeFaultInjector() { // from class: org.apache.hadoop.hdfs.TestClientProtocolForPipelineRecovery.7
                public void failPipeline(ReplicaInPipeline replicaInPipeline, String str) throws IOException {
                    if (xferAddr.equals(str) && !atomicBoolean.get() && replicaInPipeline.getBytesAcked() > 512 && replicaInPipeline.getBytesAcked() % 512 != 0) {
                        for (int i2 = 0; i2 < 10; i2++) {
                            if ((replicaInPipeline.getBytesOnDisk() / 512) - (replicaInPipeline.getBytesAcked() / 512) >= 1) {
                                atomicBoolean.set(true);
                                throw new IOException("Failing Pipeline " + replicaInPipeline.getBytesAcked() + " : " + replicaInPipeline.getBytesOnDisk());
                            }
                            try {
                                Thread.sleep(200L);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
            });
            Random random = new Random();
            byte[] bArr = new byte[5000];
            while (i < 1048576) {
                random.nextBytes(bArr);
                create.write(bArr);
                i += 5000;
                create.hflush();
            }
            Assert.assertTrue("Expected a failure in the pipeline", atomicBoolean.get());
            DatanodeInfo[] nodes2 = wrappedStream.getStreamer().getNodes();
            create.close();
            Iterator<DataNode> it = build.getDataNodes().iterator();
            while (it.hasNext()) {
                DataNodeTestUtils.triggerBlockReport(it.next());
            }
            List asList = Arrays.asList(nodes);
            DatanodeInfo datanodeInfo = null;
            int length = nodes2.length;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                DatanodeInfo datanodeInfo2 = nodes2[i2];
                if (!asList.contains(datanodeInfo2)) {
                    datanodeInfo = datanodeInfo2;
                    break;
                }
                i2++;
            }
            LOG.info("Number of nodes in pipeline: {} newNode {}", Integer.valueOf(nodes2.length), datanodeInfo.getName());
            for (int i3 = 0; i3 < nodes2.length; i3++) {
                if (!nodes2[i3].getName().equals(datanodeInfo.getName())) {
                    LOG.info("shutdown {}", nodes2[i3].getName());
                    build.stopDataNode(nodes2[i3].getName());
                }
            }
            DFSTestUtil.readFile(fileSystem, path);
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            build.shutdown();
        } catch (Throwable th) {
            DataNodeFaultInjector.set(dataNodeFaultInjector);
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testUpdatePipeLineAfterDNReg() throws Exception {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        MiniDFSCluster miniDFSCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(2).build();
            miniDFSCluster.waitActive();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            FSDataOutputStream create = fileSystem.create(new Path("/testUpdatePipeLineAfterDNReg"));
            create.write(1);
            create.hflush();
            DataNode dataNode = miniDFSCluster.getDataNodes().get(0);
            dataNode.setHeartbeatsDisabledForTests(true);
            miniDFSCluster.setDataNodeDead(miniDFSCluster.getNamesystem(0).getBlockManager().getDatanodeManager().getDatanode(dataNode.getDatanodeId()));
            new DatanodeProtocolClientSideTranslatorPB(miniDFSCluster.getNameNode().getNameNodeAddress(), hdfsConfiguration).registerDatanode(dataNode.getDNRegistrationForBP(miniDFSCluster.getNamesystem().getBlockPoolId()));
            DFSOutputStream wrappedStream = create.getWrappedStream();
            String clientName = fileSystem.getClient().getClientName();
            NamenodeProtocols nameNodeRpc = miniDFSCluster.getNameNodeRpc();
            wrappedStream.getStreamer().updatePipeline(nameNodeRpc.updateBlockForPipeline(wrappedStream.getBlock(), clientName).getBlock().getGenerationStamp());
            wrappedStream.getStreamer().updatePipeline(nameNodeRpc.updateBlockForPipeline(wrappedStream.getBlock(), clientName).getBlock().getGenerationStamp());
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    static {
        $assertionsDisabled = !TestClientProtocolForPipelineRecovery.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class);
    }
}
