/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(value=300L)
public class TestHeartbeatHandling {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeat() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).build();
        try {
            cluster.waitActive();
            FSNamesystem namesystem = cluster.getNamesystem();
            HeartbeatManager hm = namesystem.getBlockManager().getDatanodeManager().getHeartbeatManager();
            String poolId = namesystem.getBlockPoolId();
            DatanodeRegistration nodeReg = InternalDataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
            DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg);
            String storageID = DatanodeStorage.generateUuid();
            dd.updateStorage(new DatanodeStorage(storageID));
            boolean REMAINING_BLOCKS = true;
            int MAX_REPLICATE_LIMIT = conf.getInt("dfs.namenode.replication.max-streams", 2);
            int MAX_INVALIDATE_LIMIT = 1000;
            int MAX_INVALIDATE_BLOCKS = 2001;
            int MAX_REPLICATE_BLOCKS = 2 * MAX_REPLICATE_LIMIT + 1;
            DatanodeStorageInfo[] ONE_TARGET = new DatanodeStorageInfo[]{dd.getStorageInfo(storageID)};
            try {
                namesystem.writeLock(RwLockMode.BM);
                HeartbeatManager heartbeatManager = hm;
                synchronized (heartbeatManager) {
                    for (int i = 0; i < MAX_REPLICATE_BLOCKS; ++i) {
                        dd.addBlockToBeReplicated(new Block((long)i, 0L, 1000L), ONE_TARGET);
                    }
                    DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                    Assertions.assertEquals((int)1, (int)cmds.length);
                    Assertions.assertEquals((int)1, (int)cmds[0].getAction());
                    Assertions.assertEquals((int)MAX_REPLICATE_LIMIT, (int)((BlockCommand)cmds[0]).getBlocks().length);
                    ArrayList<Block> blockList = new ArrayList<Block>(2001);
                    for (int i = 0; i < 2001; ++i) {
                        blockList.add(new Block((long)i, 0L, 1000L));
                    }
                    dd.addBlocksToBeInvalidated(blockList);
                    cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                    Assertions.assertEquals((int)2, (int)cmds.length);
                    Assertions.assertEquals((int)1, (int)cmds[0].getAction());
                    Assertions.assertEquals((int)MAX_REPLICATE_LIMIT, (int)((BlockCommand)cmds[0]).getBlocks().length);
                    Assertions.assertEquals((int)2, (int)cmds[1].getAction());
                    Assertions.assertEquals((int)1000, (int)((BlockCommand)cmds[1]).getBlocks().length);
                    cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                    Assertions.assertEquals((int)2, (int)cmds.length);
                    Assertions.assertEquals((int)1, (int)cmds[0].getAction());
                    Assertions.assertEquals((int)1, (int)((BlockCommand)cmds[0]).getBlocks().length);
                    Assertions.assertEquals((int)2, (int)cmds[1].getAction());
                    Assertions.assertEquals((int)1000, (int)((BlockCommand)cmds[1]).getBlocks().length);
                    cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                    Assertions.assertEquals((int)1, (int)cmds.length);
                    Assertions.assertEquals((int)2, (int)cmds[0].getAction());
                    Assertions.assertEquals((int)1, (int)((BlockCommand)cmds[0]).getBlocks().length);
                    cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem).getCommands();
                    Assertions.assertEquals((int)0, (int)cmds.length);
                }
            }
            finally {
                namesystem.writeUnlock(RwLockMode.BM, "testHeartbeat");
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatBlockRecovery() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
        try {
            cluster.waitActive();
            FSNamesystem namesystem = cluster.getNamesystem();
            HeartbeatManager hm = namesystem.getBlockManager().getDatanodeManager().getHeartbeatManager();
            String poolId = namesystem.getBlockPoolId();
            DatanodeRegistration nodeReg1 = InternalDataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
            DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg1);
            dd1.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
            DatanodeRegistration nodeReg2 = InternalDataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
            DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg2);
            dd2.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
            DatanodeRegistration nodeReg3 = InternalDataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
            DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, (DatanodeID)nodeReg3);
            dd3.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
            try {
                namesystem.writeLock(RwLockMode.BM);
                HeartbeatManager heartbeatManager = hm;
                synchronized (heartbeatManager) {
                    NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
                    NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
                    NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd1, 0L);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd2, 0L);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd3, 0L);
                    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[]{dd1.getStorageInfos()[0], dd2.getStorageInfos()[0], dd3.getStorageInfos()[0]};
                    BlockInfoContiguous blockInfo = new BlockInfoContiguous(new Block(0L, 0L, 1000L), 3);
                    blockInfo.convertToBlockUnderConstruction(HdfsServerConstants.BlockUCState.UNDER_RECOVERY, storages);
                    dd1.addBlockToBeRecovered((BlockInfo)blockInfo);
                    DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
                    Assertions.assertEquals((int)1, (int)cmds.length);
                    Assertions.assertEquals((int)6, (int)cmds[0].getAction());
                    BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
                    Assertions.assertEquals((int)1, (int)recoveryCommand.getRecoveringBlocks().size());
                    DatanodeInfoWithStorage[] recoveringNodes = recoveryCommand.getRecoveringBlocks().toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
                    Assertions.assertEquals((int)3, (int)recoveringNodes.length);
                    Assertions.assertEquals((Object)recoveringNodes[0], (Object)dd1);
                    Assertions.assertEquals((Object)recoveringNodes[1], (Object)dd2);
                    Assertions.assertEquals((Object)recoveringNodes[2], (Object)dd3);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd1, 0L);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd2, -40000L);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd3, 0L);
                    blockInfo = new BlockInfoContiguous(new Block(0L, 0L, 1000L), 3);
                    blockInfo.convertToBlockUnderConstruction(HdfsServerConstants.BlockUCState.UNDER_RECOVERY, storages);
                    dd1.addBlockToBeRecovered((BlockInfo)blockInfo);
                    cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
                    Assertions.assertEquals((int)1, (int)cmds.length);
                    Assertions.assertEquals((int)6, (int)cmds[0].getAction());
                    recoveryCommand = (BlockRecoveryCommand)cmds[0];
                    Assertions.assertEquals((int)1, (int)recoveryCommand.getRecoveringBlocks().size());
                    recoveringNodes = recoveryCommand.getRecoveringBlocks().toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
                    Assertions.assertEquals((int)2, (int)recoveringNodes.length);
                    Assertions.assertEquals((Object)recoveringNodes[0], (Object)dd1);
                    Assertions.assertEquals((Object)recoveringNodes[1], (Object)dd3);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd1, -60000L);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd2, -40000L);
                    DFSTestUtil.resetLastUpdatesWithOffset((DatanodeInfo)dd3, -80000L);
                    blockInfo = new BlockInfoContiguous(new Block(0L, 0L, 1000L), 3);
                    blockInfo.convertToBlockUnderConstruction(HdfsServerConstants.BlockUCState.UNDER_RECOVERY, storages);
                    dd1.addBlockToBeRecovered((BlockInfo)blockInfo);
                    cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
                    Assertions.assertEquals((int)1, (int)cmds.length);
                    Assertions.assertEquals((int)6, (int)cmds[0].getAction());
                    recoveryCommand = (BlockRecoveryCommand)cmds[0];
                    Assertions.assertEquals((int)1, (int)recoveryCommand.getRecoveringBlocks().size());
                    recoveringNodes = recoveryCommand.getRecoveringBlocks().toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
                    Assertions.assertEquals((int)3, (int)recoveringNodes.length);
                    Assertions.assertEquals((Object)recoveringNodes[0], (Object)dd1);
                    Assertions.assertEquals((Object)recoveringNodes[1], (Object)dd2);
                    Assertions.assertEquals((Object)recoveringNodes[2], (Object)dd3);
                }
            }
            finally {
                namesystem.writeUnlock(RwLockMode.BM, "testHeartbeat");
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    @Test
    public void testHeartbeatStopWatch() throws Exception {
        Namesystem ns = (Namesystem)Mockito.mock(Namesystem.class);
        BlockManager bm = (BlockManager)Mockito.mock(BlockManager.class);
        Configuration conf = new Configuration();
        long recheck = 2000L;
        conf.setLong("dfs.namenode.heartbeat.recheck-interval", recheck);
        HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
        monitor.restartHeartbeatStopWatch();
        Assertions.assertFalse((boolean)monitor.shouldAbortHeartbeatCheck(0L));
        Thread.sleep(100L);
        Assertions.assertFalse((boolean)monitor.shouldAbortHeartbeatCheck(0L));
        Thread.sleep(recheck);
        Assertions.assertTrue((boolean)monitor.shouldAbortHeartbeatCheck(0L));
        Assertions.assertFalse((boolean)monitor.shouldAbortHeartbeatCheck(-recheck * 3L));
        monitor.restartHeartbeatStopWatch();
        Assertions.assertFalse((boolean)monitor.shouldAbortHeartbeatCheck(0L));
    }
}

