/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class TestBPOfferService {
    private static final String FAKE_BPID = "fake bpid";
    private static final String FAKE_CLUSTERID = "fake cluster";
    protected static final Logger LOG = LoggerFactory.getLogger(TestBPOfferService.class);
    private static final ExtendedBlock FAKE_BLOCK = new ExtendedBlock("fake bpid", 12345L);
    private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
    private long firstCallTime = 0L;
    private long secondCallTime = 0L;
    private long firstLeaseId = 0L;
    private long secondLeaseId = 0L;
    private long nextFullBlockReportLeaseId = 1L;
    @TempDir
    java.nio.file.Path baseDir;
    private DatanodeProtocolClientSideTranslatorPB mockNN1;
    private DatanodeProtocolClientSideTranslatorPB mockNN2;
    private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[3];
    private final DatanodeCommand[][] datanodeCommands = new DatanodeCommand[3][0];
    private final int[] heartbeatCounts = new int[3];
    private DataNode mockDn;
    private FsDatasetSpi<?> mockFSDataset;
    private DataSetLockManager dataSetLockManager = new DataSetLockManager();
    private boolean isSlownode;
    private String mockStorageID;

    @BeforeEach
    public void setupMocks() throws Exception {
        this.mockNN1 = this.setupNNMock(0);
        this.mockNN2 = this.setupNNMock(1);
        this.mockDn = (DataNode)Mockito.mock(DataNode.class, (MockSettings)Mockito.withSettings().stubOnly());
        ((DataNode)Mockito.doReturn((Object)true).when((Object)this.mockDn)).shouldRun();
        Configuration conf = new Configuration();
        File dnDataDir = new File(new File(TEST_BUILD_DATA, "dfs"), "data");
        conf.set("dfs.datanode.data.dir", dnDataDir.toURI().toString());
        ((DataNode)Mockito.doReturn((Object)conf).when((Object)this.mockDn)).getConf();
        ((DataNode)Mockito.doReturn((Object)new DNConf((Configurable)this.mockDn)).when((Object)this.mockDn)).getDnConf();
        ((DataNode)Mockito.doReturn((Object)DataNodeMetrics.create((Configuration)conf, (String)"fake dn")).when((Object)this.mockDn)).getMetrics();
        this.mockFSDataset = (FsDatasetSpi)Mockito.spy((Object)new SimulatedFSDataset(null, conf));
        this.mockFSDataset.addBlockPool(FAKE_BPID, conf);
        this.mockStorageID = ((SimulatedFSDataset)this.mockFSDataset).getStorages().get(0).getStorageUuid();
        ((DataNode)Mockito.doReturn(this.mockFSDataset).when((Object)this.mockDn)).getFSDataset();
        ((DataNode)Mockito.doReturn((Object)this.dataSetLockManager).when((Object)this.mockDn)).getDataSetLockManager();
    }

    @AfterEach
    public void checkDataSetLockManager() {
        this.dataSetLockManager.lockLeakCheck();
        Assertions.assertNull((Object)this.dataSetLockManager.getLastException());
    }

    private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) throws Exception {
        DatanodeProtocolClientSideTranslatorPB mock = (DatanodeProtocolClientSideTranslatorPB)Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0L)).when((Object)mock)).versionRequest();
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)DFSTestUtil.getLocalDatanodeRegistration()).when((Object)mock)).registerDatanode((DatanodeRegistration)Mockito.any());
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new HeartbeatAnswer(nnIdx)).when((Object)mock)).sendHeartbeat((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), (StorageReport[])Mockito.any(StorageReport[].class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), (VolumeFailureSummary)Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), (SlowPeerReports)Mockito.any(SlowPeerReports.class), (SlowDiskReports)Mockito.any(SlowDiskReports.class));
        this.mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.STANDBY, 0L);
        this.datanodeCommands[nnIdx] = new DatanodeCommand[0];
        return mock;
    }

    @Test
    public void testBasicFunctionality() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForBothActors(bpos);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN1)).registerDatanode((DatanodeRegistration)Mockito.any());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN2)).registerDatanode((DatanodeRegistration)Mockito.any());
            this.waitForBlockReport(this.mockNN1);
            this.waitForBlockReport(this.mockNN2);
            bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, this.mockStorageID, false);
            ReceivedDeletedBlockInfo[] ret = this.waitForBlockReceived(FAKE_BLOCK, this.mockNN1);
            Assertions.assertEquals((int)1, (int)ret.length);
            Assertions.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
            ret = this.waitForBlockReceived(FAKE_BLOCK, this.mockNN2);
            Assertions.assertEquals((int)1, (int)ret.length);
            Assertions.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMissBlocksWhenReregister() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        int totalTestBlocks = 4000;
        Thread addNewBlockThread = null;
        final AtomicInteger count = new AtomicInteger(0);
        DataNodeFaultInjector prevDNFaultInjector = null;
        TreeSet<Long> blocks = new TreeSet<Long>();
        try {
            this.waitForBothActors(bpos);
            this.waitForInitialization(bpos);
            prevDNFaultInjector = DataNodeFaultInjector.get();
            DataNodeFaultInjector.set((DataNodeFaultInjector)new DataNodeFaultInjector(){

                public void blockUtilSendFullBlockReport() {
                    try {
                        GenericTestUtils.waitFor(() -> count.get() > 2000, (long)100L, (long)1000L);
                    }
                    catch (Exception e) {
                        LOG.error("error DataNodeFaultInjector", (Throwable)e);
                    }
                }
            });
            this.countBlockReportItems(FAKE_BLOCK, this.mockNN1, blocks);
            addNewBlockThread = new Thread(() -> {
                for (int i = 0; i < totalTestBlocks; ++i) {
                    SimulatedFSDataset fsDataset = (SimulatedFSDataset)this.mockFSDataset;
                    SimulatedFSDataset.SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
                    String storageId = simulatedStorage.getStorageUuid();
                    ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), (long)i, 0L, (long)i);
                    try {
                        fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
                        bpos.notifyNamenodeReceivingBlock(b, storageId);
                        fsDataset.finalizeBlock(b, false);
                        count.addAndGet(1);
                        Thread.sleep(1L);
                        continue;
                    }
                    catch (Exception e) {
                        LOG.error("error addNewBlockThread", (Throwable)e);
                    }
                }
            });
            addNewBlockThread.start();
            GenericTestUtils.waitFor(() -> count.get() > 0, (long)100L, (long)1000L);
            this.datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
            bpos.triggerHeartbeatForTests();
            addNewBlockThread.join();
            addNewBlockThread = null;
            try {
                GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks, (long)1000L, (long)15000L);
            }
            catch (Exception e) {
                Assertions.fail((String)String.format("Timed out waiting for blocks count. reported = %d, expected = %d. Exception: %s", blocks.size(), totalTestBlocks, e.getMessage()));
            }
        }
        finally {
            if (addNewBlockThread != null) {
                addNewBlockThread.interrupt();
            }
            bpos.stop();
            bpos.join();
            DataNodeFaultInjector.set((DataNodeFaultInjector)prevDNFaultInjector);
        }
    }

    @Test
    public void testLocklessBlockPoolId() throws Exception {
        BPOfferService bpos = (BPOfferService)Mockito.spy((Object)this.setupBPOSForNNs(this.mockNN1));
        Assertions.assertNull((Object)bpos.getBlockPoolId());
        ((BPOfferService)Mockito.verify((Object)bpos)).readLock();
        Mockito.reset((Object[])new BPOfferService[]{bpos});
        NamespaceInfo nsInfo = new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0L);
        Assertions.assertNull((Object)bpos.setNamespaceInfo(nsInfo));
        Assertions.assertEquals((Object)FAKE_BPID, (Object)bpos.getBlockPoolId());
        ((BPOfferService)Mockito.verify((Object)bpos, (VerificationMode)Mockito.never())).readLock();
        Mockito.reset((Object[])new BPOfferService[]{bpos});
        Assertions.assertEquals((Object)nsInfo, (Object)bpos.setNamespaceInfo(null));
        Assertions.assertNull((Object)bpos.getBlockPoolId());
        ((BPOfferService)Mockito.verify((Object)bpos)).readLock();
        Mockito.reset((Object[])new BPOfferService[]{bpos});
        Assertions.assertNull((Object)bpos.setNamespaceInfo(nsInfo));
        Assertions.assertEquals((Object)FAKE_BPID, (Object)bpos.getBlockPoolId());
        ((BPOfferService)Mockito.verify((Object)bpos, (VerificationMode)Mockito.never())).readLock();
    }

    @Test
    public void testIgnoreDeletionsFromNonActive() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new BlockCommand(2, FAKE_BPID, new Block[]{FAKE_BLOCK.getLocalBlock()})).when((Object)this.mockNN2)).blockReport((DatanodeRegistration)Mockito.any(), (String)Mockito.eq((Object)FAKE_BPID), (StorageBlockReport[])Mockito.any(), (BlockReportContext)Mockito.any());
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            this.waitForBlockReport(this.mockNN1);
            this.waitForBlockReport(this.mockNN2);
        }
        finally {
            bpos.stop();
            bpos.join();
        }
        ((FsDatasetSpi)Mockito.verify(this.mockFSDataset, (VerificationMode)Mockito.never())).invalidate((String)Mockito.eq((Object)FAKE_BPID), (Block[])Mockito.any());
    }

    @Test
    public void testNNsFromDifferentClusters() throws Exception {
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, 0L)).when((Object)this.mockNN1)).versionRequest();
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForOneToFail(bpos);
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    @Test
    public void testPickActiveNameNode() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            this.mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 2L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN2, (Object)bpos.getActiveNN());
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN2, (Object)bpos.getActiveNN());
            this.mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.STANDBY, 2L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 3L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBPInitErrorHandling() throws Exception {
        final DataNode mockDn = (DataNode)Mockito.mock(DataNode.class);
        ((DataNode)Mockito.doReturn((Object)true).when((Object)mockDn)).shouldRun();
        ((DataNode)Mockito.doReturn((Object)this.dataSetLockManager).when((Object)mockDn)).getDataSetLockManager();
        Configuration conf = new Configuration();
        File dnDataDir = new File(new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");
        conf.set("dfs.datanode.data.dir", dnDataDir.toURI().toString());
        ((DataNode)Mockito.doReturn((Object)conf).when((Object)mockDn)).getConf();
        ((DataNode)Mockito.doReturn((Object)new DNConf((Configurable)mockDn)).when((Object)mockDn)).getDnConf();
        ((DataNode)Mockito.doReturn((Object)DataNodeMetrics.create((Configuration)conf, (String)"fake dn")).when((Object)mockDn)).getMetrics();
        final AtomicInteger count = new AtomicInteger();
        ((DataNode)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                if (count.getAndIncrement() == 0) {
                    throw new IOException("faked initBlockPool exception");
                }
                ((DataNode)Mockito.doReturn((Object)TestBPOfferService.this.mockFSDataset).when((Object)mockDn)).getFSDataset();
                return null;
            }
        }).when((Object)mockDn)).initBlockPool((BPOfferService)Mockito.any(BPOfferService.class));
        BPOfferService bpos = this.setupBPOSForNNs(mockDn, this.mockNN1, this.mockNN2);
        List actors = bpos.getBPServiceActors();
        Assertions.assertEquals((int)2, (int)actors.size());
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            this.waitForBlockReport(this.mockNN1, this.mockNN2);
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    private void waitForOneToFail(final BPOfferService bpos) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                List actors = bpos.getBPServiceActors();
                int failedcount = 0;
                for (BPServiceActor actor : actors) {
                    if (actor.isAlive()) continue;
                    ++failedcount;
                }
                return failedcount == 1;
            }
        }, (long)100L, (long)10000L);
    }

    private BPOfferService setupBPOSForNNs(DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
        return this.setupBPOSForNNs(this.mockDn, nns);
    }

    private BPOfferService setupBPOSForNNs(DataNode mockDn, DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
        LinkedHashMap nnMap = Maps.newLinkedHashMap();
        ArrayList nnIds = Lists.newArrayListWithCapacity((int)nns.length);
        for (int port = 0; port < nns.length; ++port) {
            nnMap.put(new InetSocketAddress(port), nns[port]);
            ((DataNode)Mockito.doReturn((Object)nns[port]).when((Object)mockDn)).connectToNN((InetSocketAddress)Mockito.eq((Object)new InetSocketAddress(port)));
            nnIds.add("nn" + port);
        }
        return new BPOfferService("test_ns", (List)nnIds, (List)Lists.newArrayList(nnMap.keySet()), Collections.nCopies(nnMap.size(), null), mockDn);
    }

    private void waitForInitialization(final BPOfferService bpos) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                return bpos.isAlive() && bpos.isInitialized();
            }
        }, (long)100L, (long)10000L);
    }

    private void waitForBothActors(final BPOfferService bpos) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                List actors = bpos.getBPServiceActors();
                return bpos.isAlive() && this.getRegisteredActors(actors) == 2;
            }

            private int getRegisteredActors(List<BPServiceActor> actors) {
                int regActors = 0;
                for (BPServiceActor actor : actors) {
                    if (actor.getBpRegistration() == null) continue;
                    ++regActors;
                }
                return regActors;
            }
        }, (long)100L, (long)10000L);
    }

    private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)mockNN)).blockReport((DatanodeRegistration)Mockito.any(), (String)Mockito.eq((Object)TestBPOfferService.FAKE_BPID), (StorageBlockReport[])Mockito.any(), (BlockReportContext)Mockito.any());
                    return true;
                }
                catch (Throwable t) {
                    LOG.info("waiting on block report: " + t.getMessage());
                    return false;
                }
            }
        }, (long)500L, (long)10000L);
    }

    private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN1, final DatanodeProtocolClientSideTranslatorPB mockNN2) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                return this.get(mockNN1) != false || this.get(mockNN2) != false;
            }

            private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)mockNN)).blockReport((DatanodeRegistration)Mockito.any(), (String)Mockito.eq((Object)TestBPOfferService.FAKE_BPID), (StorageBlockReport[])Mockito.any(), (BlockReportContext)Mockito.any());
                    return true;
                }
                catch (Throwable t) {
                    LOG.info("waiting on block report: " + t.getMessage());
                    return false;
                }
            }
        }, (long)500L, (long)10000L);
    }

    private void waitForRegistration(final DatanodeProtocolClientSideTranslatorPB mockNN, int times) throws Exception {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)mockNN, (VerificationMode)Mockito.times((int)2))).registerDatanode((DatanodeRegistration)Mockito.any());
                    return true;
                }
                catch (Throwable t) {
                    LOG.info("waiting on block registerDatanode: " + t.getMessage());
                    return false;
                }
            }
        }, (long)500L, (long)10000L);
    }

    private ReceivedDeletedBlockInfo[] waitForBlockReceived(ExtendedBlock fakeBlock, final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
        final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
        final ArgumentCaptor captor = ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)mockNN)).blockReceivedAndDeleted((DatanodeRegistration)Mockito.any(), (String)Mockito.eq((Object)fakeBlockPoolId), (StorageReceivedDeletedBlocks[])captor.capture());
                    return true;
                }
                catch (Throwable t) {
                    return false;
                }
            }
        }, (long)100L, (long)10000L);
        return ((StorageReceivedDeletedBlocks[])captor.getValue())[0].getBlocks();
    }

    private void setTimeForSynchronousBPOSCalls() {
        if (this.firstCallTime == 0L) {
            this.firstCallTime = Time.now();
        } else {
            this.secondCallTime = Time.now();
        }
    }

    private void countBlockReportItems(ExtendedBlock fakeBlock, DatanodeProtocolClientSideTranslatorPB mockNN, Set<Long> blocks) throws Exception {
        String fakeBlockPoolId = fakeBlock.getBlockPoolId();
        ArgumentCaptor captor = ArgumentCaptor.forClass(StorageBlockReport[].class);
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer(invocation -> {
            Object[] arguments = invocation.getArguments();
            StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
            for (BlockListAsLongs.BlockReportReplica brr : list[0].getBlocks()) {
                blocks.add(brr.getBlockId());
            }
            return null;
        }).when((Object)mockNN)).blockReport((DatanodeRegistration)Mockito.any(), (String)Mockito.eq((Object)fakeBlockPoolId), (StorageBlockReport[])captor.capture(), (BlockReportContext)Mockito.any());
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer(invocation -> {
            Object[] arguments = invocation.getArguments();
            StorageReceivedDeletedBlocks[] list = (StorageReceivedDeletedBlocks[])arguments[2];
            for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) {
                blocks.add(rdbi.getBlock().getBlockId());
            }
            return null;
        }).when((Object)mockNN)).blockReceivedAndDeleted((DatanodeRegistration)Mockito.any(), (String)Mockito.eq((Object)fakeBlockPoolId), (StorageReceivedDeletedBlocks[])Mockito.any());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new BPOfferServiceSynchronousCallAnswer(0)).when((Object)this.mockNN1)).reportBadBlocks((LocatedBlock[])Mockito.any());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new BPOfferServiceSynchronousCallAnswer(1)).when((Object)this.mockNN2)).reportBadBlocks((LocatedBlock[])Mockito.any());
            bpos.reportBadBlocks(FAKE_BLOCK, this.mockFSDataset.getVolume(FAKE_BLOCK).getStorageID(), this.mockFSDataset.getVolume(FAKE_BLOCK).getStorageType());
            bpos.reportBadBlocks(FAKE_BLOCK, this.mockFSDataset.getVolume(FAKE_BLOCK).getStorageID(), this.mockFSDataset.getVolume(FAKE_BLOCK).getStorageType());
            Thread.sleep(10000L);
            long difference = this.secondCallTime - this.firstCallTime;
            Assertions.assertTrue((difference < 5000L ? 1 : 0) != 0, (String)"Active namenode reportBadBlock processing should be independent of standby namenode reportBadBlock processing ");
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new BPOfferServiceSynchronousCallAnswer(0)).when((Object)this.mockNN1)).errorReport((DatanodeRegistration)Mockito.any(), Mockito.anyInt(), Mockito.anyString());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new BPOfferServiceSynchronousCallAnswer(1)).when((Object)this.mockNN2)).errorReport((DatanodeRegistration)Mockito.any(), Mockito.anyInt(), Mockito.anyString());
            String errorString = "Can't send invalid block " + FAKE_BLOCK;
            bpos.trySendErrorReport(2, errorString);
            bpos.trySendErrorReport(2, errorString);
            Thread.sleep(10000L);
            long difference = this.secondCallTime - this.firstCallTime;
            Assertions.assertTrue((difference < 5000L ? 1 : 0) != 0, (String)"Active namenode trySendErrorReport processing should be independent of standby namenode trySendErrorReport processing ");
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    @Test
    public void testTrySendErrorReportWhenNNThrowsIOException() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Throw IOException in the first call.")}).doAnswer(invocation -> {
                this.secondCallTime = Time.now();
                return null;
            }).when((Object)this.mockNN1)).errorReport((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), Mockito.anyInt(), Mockito.anyString());
            String errorString = "Can't send invalid block " + FAKE_BLOCK;
            bpos.trySendErrorReport(2, errorString);
            GenericTestUtils.waitFor(() -> this.secondCallTime != 0L, (long)100L, (long)20000L);
            Assertions.assertTrue((this.secondCallTime != 0L ? 1 : 0) != 0, (String)"Active namenode didn't add the report back to the queue when errorReport threw IOException");
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    @Test
    public void testReportBadBlocksWhenNNThrowsStandbyException() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doNothing().when((Object)this.mockNN1)).reportBadBlocks((LocatedBlock[])Mockito.any(LocatedBlock[].class));
            RemoteException re = new RemoteException(StandbyException.class.getName(), "Operation category WRITE is not supported in state standby", RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_APPLICATION);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doThrow((Throwable[])new Throwable[]{re}).when((Object)this.mockNN2)).reportBadBlocks((LocatedBlock[])Mockito.any(LocatedBlock[].class));
            bpos.reportBadBlocks(FAKE_BLOCK, this.mockFSDataset.getVolume(FAKE_BLOCK).getStorageID(), this.mockFSDataset.getVolume(FAKE_BLOCK).getStorageType());
            bpos.triggerHeartbeatForTests();
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN2, (VerificationMode)Mockito.times((int)1))).reportBadBlocks((LocatedBlock[])Mockito.any(LocatedBlock[].class));
            bpos.triggerHeartbeatForTests();
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN2, (VerificationMode)Mockito.times((int)1))).reportBadBlocks((LocatedBlock[])Mockito.any(LocatedBlock[].class));
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIBRClearanceForStandbyOnReRegister() throws Exception {
        final BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            Assertions.assertNull((Object)bpos.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bpos.triggerHeartbeatForTests();
            Assertions.assertSame((Object)this.mockNN1, (Object)bpos.getActiveNN());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doNothing().when((Object)this.mockNN1)).blockReceivedAndDeleted((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), Mockito.anyString(), (StorageReceivedDeletedBlocks[])Mockito.any(StorageReceivedDeletedBlocks[].class));
            final IOException re = new IOException("Standby NN is currently not able to process IBR");
            final AtomicBoolean ibrReported = new AtomicBoolean(false);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    ibrReported.set(true);
                    throw re;
                }
            }).when((Object)this.mockNN2)).blockReceivedAndDeleted((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), Mockito.anyString(), (StorageReceivedDeletedBlocks[])Mockito.any(StorageReceivedDeletedBlocks[].class));
            DatanodeStorage storage = (DatanodeStorage)Mockito.mock(DatanodeStorage.class);
            ((FsDatasetSpi)Mockito.doReturn((Object)storage).when(this.mockFSDataset)).getStorage("storage0");
            bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
            bpos.triggerHeartbeatForTests();
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                @Override
                public Boolean get() {
                    return ibrReported.get();
                }
            }, (long)100L, (long)1000L);
            this.datanodeCommands[1] = new DatanodeCommand[]{new RegisterCommand()};
            Assertions.assertEquals((int)1, (int)this.getStandbyIBRSize(bpos), (String)"IBR size before reRegister should be non-0");
            bpos.triggerHeartbeatForTests();
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                @Override
                public Boolean get() {
                    return TestBPOfferService.this.getStandbyIBRSize(bpos) == 0;
                }
            }, (long)100L, (long)1000L);
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    private int getStandbyIBRSize(BPOfferService bpos) {
        List bpServiceActors = bpos.getBPServiceActors();
        for (BPServiceActor bpServiceActor : bpServiceActors) {
            if (bpServiceActor.state != HAServiceProtocol.HAServiceState.STANDBY) continue;
            return bpServiceActor.getIbrManager().getPendingIBRSize();
        }
        return -1;
    }

    @Test
    public void testNNHAStateUpdateFromVersionRequest() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1, this.mockNN2);
        ((DataNode)Mockito.doReturn((Object)true).when((Object)this.mockDn)).areHeartbeatsDisabledForTests();
        BPServiceActor actor = (BPServiceActor)bpos.getBPServiceActors().get(0);
        bpos.start();
        this.waitForInitialization(bpos);
        Assertions.assertNull((Object)bpos.getActiveNN());
        NamespaceInfo nsInfo = this.mockNN1.versionRequest();
        bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
        Assertions.assertNull((Object)bpos.getActiveNN());
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0L, HAServiceProtocol.HAServiceState.ACTIVE)).when((Object)this.mockNN1)).versionRequest();
        nsInfo = this.mockNN1.versionRequest();
        bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
        Assertions.assertNotNull((Object)bpos.getActiveNN());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testRefreshNameNodes() throws Exception {
        BPOfferService bpos = this.setupBPOSForNNs(this.mockDn, this.mockNN1, this.mockNN2);
        bpos.start();
        try {
            this.waitForBothActors(bpos);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN1)).registerDatanode((DatanodeRegistration)Mockito.any());
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)this.mockNN2)).registerDatanode((DatanodeRegistration)Mockito.any());
            this.waitForBlockReport(this.mockNN1);
            this.waitForBlockReport(this.mockNN2);
            bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, this.mockStorageID, false);
            ReceivedDeletedBlockInfo[] ret = this.waitForBlockReceived(FAKE_BLOCK, this.mockNN1);
            Assertions.assertEquals((int)1, (int)ret.length);
            Assertions.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
            ret = this.waitForBlockReceived(FAKE_BLOCK, this.mockNN2);
            Assertions.assertEquals((int)1, (int)ret.length);
            Assertions.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
            DatanodeProtocolClientSideTranslatorPB mockNN3 = this.setupNNMock(2);
            ((DataNode)Mockito.doReturn((Object)mockNN3).when((Object)this.mockDn)).connectToNN((InetSocketAddress)Mockito.eq((Object)new InetSocketAddress(2)));
            ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
            ArrayList<Object> lifelineAddrs = new ArrayList<Object>(addrs.size());
            addrs.add(new InetSocketAddress(0));
            lifelineAddrs.add(null);
            addrs.add(new InetSocketAddress(2));
            lifelineAddrs.add(null);
            ArrayList<String> nnIds = new ArrayList<String>(addrs.size());
            for (int i = 0; i < addrs.size(); ++i) {
                nnIds.add("nn" + i);
            }
            bpos.refreshNNList("serviceId", nnIds, addrs, lifelineAddrs);
            Assertions.assertEquals((int)2, (int)bpos.getBPServiceActors().size());
            Thread.sleep(1000L);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)mockNN3)).registerDatanode((DatanodeRegistration)Mockito.any());
            bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, this.mockStorageID, false);
            ret = this.waitForBlockReceived(FAKE_BLOCK, mockNN3);
            Assertions.assertEquals((int)1, (int)ret.length);
            Assertions.assertEquals((Object)FAKE_BLOCK.getLocalBlock(), (Object)ret[0].getBlock());
        }
        finally {
            bpos.stop();
            bpos.join();
        }
    }

    @Test
    @Timeout(value=15L)
    public void testRefreshLeaseId() throws Exception {
        Mockito.when((Object)this.mockNN1.sendHeartbeat((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), (StorageReport[])Mockito.any(StorageReport[].class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), (VolumeFailureSummary)Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), (SlowPeerReports)Mockito.any(SlowPeerReports.class), (SlowDiskReports)Mockito.any(SlowDiskReports.class))).thenAnswer((Answer)new HeartbeatAnswer(0)).thenAnswer((Answer)new HeartbeatRegisterAnswer(0)).thenAnswer((Answer)new HeartbeatAnswer(0));
        Mockito.when((Object)this.mockNN1.blockReport((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), Mockito.anyString(), (StorageBlockReport[])Mockito.any(StorageBlockReport[].class), (BlockReportContext)Mockito.any(BlockReportContext.class))).thenAnswer(new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                BlockReportContext context = (BlockReportContext)invocation.getArguments()[3];
                long leaseId = context.getLeaseId();
                LOG.info("leaseId = " + leaseId);
                if (leaseId == 1L) {
                    TestBPOfferService.this.firstLeaseId = leaseId;
                    InvalidBlockReportLeaseException e = new InvalidBlockReportLeaseException(context.getReportId(), 1L);
                    throw new RemoteException(e.getClass().getName(), e.getMessage());
                }
                TestBPOfferService.this.secondLeaseId = leaseId;
                return null;
            }
        });
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            this.waitForRegistration(this.mockNN1, 2);
            Assertions.assertEquals((long)1L, (long)this.firstLeaseId);
            while (this.secondLeaseId != 2L) {
                Thread.sleep(1000L);
            }
        }
        finally {
            bpos.stop();
        }
    }

    @Test
    @Timeout(value=15L)
    public void testSetIsSlownode() throws Exception {
        Assertions.assertEquals((Object)this.mockDn.isSlownode(), (Object)false);
        Mockito.when((Object)this.mockNN1.sendHeartbeat((DatanodeRegistration)Mockito.any(DatanodeRegistration.class), (StorageReport[])Mockito.any(StorageReport[].class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), (VolumeFailureSummary)Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), (SlowPeerReports)Mockito.any(SlowPeerReports.class), (SlowDiskReports)Mockito.any(SlowDiskReports.class))).thenAnswer((Answer)new HeartbeatIsSlownodeAnswer(0));
        BPOfferService bpos = this.setupBPOSForNNs(this.mockNN1);
        bpos.start();
        try {
            this.waitForInitialization(bpos);
            bpos.triggerHeartbeatForTests();
            Assertions.assertFalse((boolean)bpos.isSlownode());
            this.isSlownode = true;
            bpos.triggerHeartbeatForTests();
            Assertions.assertTrue((boolean)bpos.isSlownode());
            this.isSlownode = false;
            bpos.triggerHeartbeatForTests();
            Assertions.assertFalse((boolean)bpos.isSlownode());
        }
        finally {
            bpos.stop();
        }
    }

    @Test
    @Timeout(value=15L)
    public void testCommandProcessingThread() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        try (MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf, this.baseDir.toFile()).build();){
            ArrayList<DataNode> datanodes = cluster.getDataNodes();
            Assertions.assertEquals((int)datanodes.size(), (int)1);
            DataNode datanode = (DataNode)datanodes.get(0);
            DistributedFileSystem fs = cluster.getFileSystem();
            Path file = new Path("/test");
            DFSTestUtil.createFile((FileSystem)fs, file, 10240L, (short)1, 0L);
            MetricsRecordBuilder mrb = MetricsAsserts.getMetrics((String)datanode.getMetrics().name());
            Assertions.assertTrue((MetricsAsserts.getLongCounter((String)"NumProcessedCommands", (MetricsRecordBuilder)mrb) > 0L ? 1 : 0) != 0, (String)"Process command nums is not expected.");
            Assertions.assertEquals((long)0L, (long)MetricsAsserts.getLongCounter((String)"SumOfActorCommandQueueLength", (MetricsRecordBuilder)mrb));
            MetricsAsserts.assertCounter((String)"ProcessedCommandsOpNumOps", (long)1L, (MetricsRecordBuilder)mrb);
        }
    }

    @Test
    @Timeout(value=5L)
    public void testCommandProcessingThreadExit() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        try (MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf, this.baseDir.toFile()).numDataNodes(1).build();){
            ArrayList<DataNode> datanodes = cluster.getDataNodes();
            DataNode dataNode = (DataNode)datanodes.get(0);
            List allBpOs = dataNode.getAllBpOs();
            BPOfferService bpos = (BPOfferService)allBpOs.get(0);
            this.waitForInitialization(bpos);
            BPServiceActor actor = (BPServiceActor)bpos.getBPServiceActors().get(0);
            actor.stopCommandProcessingThread();
            GenericTestUtils.waitFor(() -> !actor.isAlive(), (long)100L, (long)3000L);
        }
    }

    static {
        GenericTestUtils.setLogLevel((Logger)DataNode.LOG, (Level)Level.TRACE);
    }

    private class BPOfferServiceSynchronousCallAnswer
    implements Answer<Void> {
        private final int nnIdx;

        public BPOfferServiceSynchronousCallAnswer(int nnIdx) {
            this.nnIdx = nnIdx;
        }

        public Void answer(InvocationOnMock invocation) throws Throwable {
            if (this.nnIdx == 0) {
                TestBPOfferService.this.setTimeForSynchronousBPOSCalls();
            } else {
                Thread.sleep(5000L);
            }
            return null;
        }
    }

    private class HeartbeatRegisterAnswer
    implements Answer<HeartbeatResponse> {
        private final int nnIdx;

        HeartbeatRegisterAnswer(int nnIdx) {
            this.nnIdx = nnIdx;
        }

        public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
            int[] nArray = TestBPOfferService.this.heartbeatCounts;
            int n = this.nnIdx;
            nArray[n] = nArray[n] + 1;
            DatanodeCommand[] cmds = new DatanodeCommand[]{new RegisterCommand()};
            return new HeartbeatResponse(cmds, TestBPOfferService.this.mockHaStatuses[this.nnIdx], null, 0L);
        }
    }

    private class HeartbeatIsSlownodeAnswer
    implements Answer<HeartbeatResponse> {
        private final int nnIdx;

        HeartbeatIsSlownodeAnswer(int nnIdx) {
            this.nnIdx = nnIdx;
        }

        public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
            HeartbeatResponse heartbeatResponse = new HeartbeatResponse(TestBPOfferService.this.datanodeCommands[this.nnIdx], TestBPOfferService.this.mockHaStatuses[this.nnIdx], null, 0L, TestBPOfferService.this.isSlownode);
            return heartbeatResponse;
        }
    }

    private class HeartbeatAnswer
    implements Answer<HeartbeatResponse> {
        private final int nnIdx;

        HeartbeatAnswer(int nnIdx) {
            this.nnIdx = nnIdx;
        }

        public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
            int[] nArray = TestBPOfferService.this.heartbeatCounts;
            int n = this.nnIdx;
            nArray[n] = nArray[n] + 1;
            Boolean requestFullBlockReportLease = (Boolean)invocation.getArguments()[8];
            long fullBlockReportLeaseId = 0L;
            if (requestFullBlockReportLease.booleanValue()) {
                fullBlockReportLeaseId = TestBPOfferService.this.nextFullBlockReportLeaseId++;
            }
            LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
            HeartbeatResponse heartbeatResponse = new HeartbeatResponse(TestBPOfferService.this.datanodeCommands[this.nnIdx], TestBPOfferService.this.mockHaStatuses[this.nnIdx], null, fullBlockReportLeaseId);
            ((TestBPOfferService)TestBPOfferService.this).datanodeCommands[this.nnIdx] = new DatanodeCommand[0];
            return heartbeatResponse;
        }
    }
}

