package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.shaded.org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.hdfs.AdminStatesBaseTest;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.class */
public class TestDecommissioningStatus {
    private static final long seed = 3735928559L;
    private static final int blockSize = 8192;
    private static final int fileSize = 16384;
    private static final int numDatanodes = 2;
    private static MiniDFSCluster cluster;
    private static FileSystem fileSys;
    private static HostsFileWriter hostsFileWriter;
    private static Configuration conf;
    private Logger LOG;
    final ArrayList<String> decommissionedNodes = new ArrayList<>(2);

    @Before
    public void setUp() throws Exception {
        conf = new HdfsConfiguration();
        conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        hostsFileWriter = new HostsFileWriter();
        hostsFileWriter.initialize(conf, "work-dir/decommission");
        conf.setInt("dfs.namenode.heartbeat.recheck-interval", 1000);
        conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
        conf.setInt("dfs.namenode.reconstruction.pending.timeout-sec", 4);
        conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
        conf.setLong("dfs.datanode.balance.bandwidthPerSec", 1L);
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
        cluster.waitActive();
        fileSys = cluster.getFileSystem();
        cluster.getNamesystem().getBlockManager().getDatanodeManager().setHeartbeatExpireInterval(3000L);
        Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
        this.LOG = Logger.getLogger(TestDecommissioningStatus.class);
    }

    @After
    public void tearDown() throws Exception {
        if (hostsFileWriter != null) {
            hostsFileWriter.cleanup();
        }
        if (fileSys != null) {
            fileSys.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private String decommissionNode(DFSClient dFSClient, int i) throws IOException {
        String xferAddr = dFSClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE)[i].getXferAddr();
        decommissionNode(xferAddr);
        return xferAddr;
    }

    private void decommissionNode(String str) throws IOException {
        System.out.println("Decommissioning node: " + str);
        ArrayList arrayList = new ArrayList(this.decommissionedNodes);
        arrayList.add(str);
        hostsFileWriter.initExcludeHosts(arrayList);
    }

    private void checkDecommissionStatus(DatanodeDescriptor datanodeDescriptor, int i, int i2, int i3) {
        Assert.assertEquals("Unexpected num under-replicated blocks", i, datanodeDescriptor.getLeavingServiceStatus().getUnderReplicatedBlocks());
        Assert.assertEquals("Unexpected number of decom-only replicas", i2, datanodeDescriptor.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
        Assert.assertEquals("Unexpected number of replicas in under-replicated open files", i3, datanodeDescriptor.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
    }

    private void checkDFSAdminDecommissionStatus(List<DatanodeDescriptor> list, DistributedFileSystem distributedFileSystem, DFSAdmin dFSAdmin) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream = new PrintStream(byteArrayOutputStream);
        PrintStream printStream2 = System.out;
        System.setOut(printStream);
        try {
            dFSAdmin.report(new String[]{"-decommissioning"}, 0);
            Integer num = null;
            int i = 0;
            for (String str : byteArrayOutputStream.toString().split("\n")) {
                if (str.startsWith("Decommissioning datanodes")) {
                    String str2 = str.split(" ")[2];
                    num = Integer.valueOf(Integer.parseInt((String) str2.subSequence(1, str2.length() - 2)));
                }
                if (str.contains("Decommission in progress")) {
                    i++;
                }
            }
            Assert.assertTrue("No decommissioning output", num != null);
            Assert.assertEquals("Unexpected number of decomming DNs", list.size(), num.intValue());
            Assert.assertEquals("Unexpected number of decomming DNs", list.size(), i);
            ArrayList arrayList = new ArrayList(Arrays.asList(distributedFileSystem.getDataNodeStats(HdfsConstants.DatanodeReportType.DECOMMISSIONING)));
            Assert.assertEquals("Unexpected number of decomming DNs", list.size(), arrayList.size());
            for (DatanodeDescriptor datanodeDescriptor : list) {
                Assert.assertTrue("Did not find expected decomming DN " + datanodeDescriptor, arrayList.contains(datanodeDescriptor));
            }
        } finally {
            System.setOut(printStream2);
        }
    }

    private void waitForDecommissionedNodes(DatanodeAdminManager datanodeAdminManager, int i) throws TimeoutException, InterruptedException {
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(datanodeAdminManager.getNumTrackedNodes() == i);
        }, 100L, 2000L);
    }

    @Test
    public void testDecommissionStatus() throws Exception {
        DFSClient dFSClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
        Assert.assertEquals("Number of Datanodes ", 2L, dFSClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE).length);
        DistributedFileSystem fileSystem = cluster.getFileSystem();
        DFSAdmin dFSAdmin = new DFSAdmin(cluster.getConfiguration(0));
        Path path = new Path("decommission.dat");
        DFSTestUtil.createFile(fileSystem, path, 16384, 16384L, 8192L, (short) 2, seed);
        Path path2 = new Path("decommission1.dat");
        FSDataOutputStream writeIncompleteFile = AdminStatesBaseTest.writeIncompleteFile(fileSystem, path2, (short) 2, (short) 2);
        Iterator<DataNode> it = cluster.getDataNodes().iterator();
        while (it.hasNext()) {
            DataNodeTestUtils.triggerBlockReport(it.next());
        }
        DatanodeManager datanodeManager = cluster.getNamesystem().getBlockManager().getDatanodeManager();
        for (int i = 0; i < 2; i++) {
            String decommissionNode = decommissionNode(dFSClient, i);
            datanodeManager.refreshNodes(conf);
            this.decommissionedNodes.add(decommissionNode);
            BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
            waitForDecommissionedNodes(datanodeManager.getDatanodeAdminManager(), i + 1);
            List<DatanodeDescriptor> decommissioningNodes = datanodeManager.getDecommissioningNodes();
            if (i == 0) {
                Assert.assertEquals(decommissioningNodes.size(), 1L);
                checkDecommissionStatus(decommissioningNodes.get(0), 3, 0, 1);
                checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1), fileSystem, dFSAdmin);
            } else {
                Assert.assertEquals(decommissioningNodes.size(), 2L);
                DatanodeDescriptor datanodeDescriptor = decommissioningNodes.get(0);
                DatanodeDescriptor datanodeDescriptor2 = decommissioningNodes.get(1);
                checkDecommissionStatus(datanodeDescriptor, 3, 3, 1);
                checkDecommissionStatus(datanodeDescriptor2, 4, 4, 2);
                checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 2), fileSystem, dFSAdmin);
            }
        }
        hostsFileWriter.initExcludeHost("");
        datanodeManager.refreshNodes(conf);
        writeIncompleteFile.close();
        AdminStatesBaseTest.cleanupFile(fileSystem, path);
        AdminStatesBaseTest.cleanupFile(fileSystem, path2);
    }

    @Test(timeout = 120000)
    public void testDecommissionStatusAfterDNRestart() throws Exception {
        DistributedFileSystem fileSystem = cluster.getFileSystem();
        Path path = new Path("decommission.dat");
        DFSTestUtil.createFile(fileSystem, path, 16384, 16384L, 16384L, (short) 1, seed);
        String str = fileSystem.listLocatedStatus(path).next().getBlockLocations()[0].getNames()[0];
        FSNamesystem namesystem = cluster.getNamesystem();
        DatanodeManager datanodeManager = namesystem.getBlockManager().getDatanodeManager();
        decommissionNode(str);
        datanodeManager.refreshNodes(conf);
        MiniDFSCluster.DataNodeProperties stopDataNode = cluster.stopDataNode(str);
        ArrayList arrayList = new ArrayList();
        while (true) {
            datanodeManager.fetchDatanodes(null, arrayList, false);
            if (arrayList.size() == 1) {
                break;
            } else {
                Thread.sleep(1000L);
            }
        }
        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
        BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
        waitForDecommissionedNodes(datanodeManager.getDatanodeAdminManager(), 1);
        Assert.assertTrue("the node should be DECOMMISSION_IN_PROGRESSS", arrayList.get(0).isDecommissionInProgress());
        Assert.assertTrue("The node should be be decommissioning", datanodeManager.getDecommissioningNodes().size() == 1);
        AdminStatesBaseTest.cleanupFile(fileSystem, path);
        BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
        waitForDecommissionedNodes(datanodeManager.getDatanodeAdminManager(), 0);
        Assert.assertTrue("the node should be decommissioned", arrayList.get(0).isDecommissioned());
        cluster.restartDataNode(stopDataNode, true);
        cluster.waitActive();
        hostsFileWriter.initExcludeHost("");
        datanodeManager.refreshNodes(conf);
    }

    @Test(timeout = 120000)
    public void testDecommissionDeadDN() throws Exception {
        Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
        DatanodeID datanodeId = cluster.getDataNodes().get(0).getDatanodeId();
        String xferAddr = datanodeId.getXferAddr();
        MiniDFSCluster.DataNodeProperties stopDataNode = cluster.stopDataNode(0);
        DFSTestUtil.waitForDatanodeState(cluster, datanodeId.getDatanodeUuid(), false, 30000);
        DatanodeManager datanodeManager = cluster.getNamesystem().getBlockManager().getDatanodeManager();
        DatanodeDescriptor datanode = datanodeManager.getDatanode(datanodeId);
        decommissionNode(xferAddr);
        datanodeManager.refreshNodes(conf);
        BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
        waitForDecommissionedNodes(datanodeManager.getDatanodeAdminManager(), 0);
        Assert.assertTrue(datanode.isDecommissioned());
        cluster.restartDataNode(stopDataNode, true);
        cluster.waitActive();
        hostsFileWriter.initExcludeHost("");
        datanodeManager.refreshNodes(conf);
    }

    @Test(timeout = 120000)
    public void testDecommissionLosingData() throws Exception {
        DatanodeID datanodeId;
        DatanodeID datanodeId2;
        ArrayList arrayList = new ArrayList(2);
        BlockManager blockManager = cluster.getNamesystem().getBlockManager();
        DatanodeManager datanodeManager = blockManager.getDatanodeManager();
        Path path = new Path("decommissionLosingData.dat");
        DFSTestUtil.createFile(fileSys, path, 16384, 16384L, 8192L, (short) 2, seed);
        Thread.sleep(1000L);
        this.LOG.info("Shutdown dn1");
        DatanodeID datanodeId3 = cluster.getDataNodes().get(1).getDatanodeId();
        String xferAddr = datanodeId3.getXferAddr();
        DatanodeDescriptor datanode = datanodeManager.getDatanode(datanodeId3);
        arrayList.add(xferAddr);
        MiniDFSCluster.DataNodeProperties stopDataNode = cluster.stopDataNode(1);
        DFSTestUtil.waitForDatanodeState(cluster, datanodeId3.getDatanodeUuid(), false, 30000);
        this.LOG.info("Shutdown dn0");
        DatanodeID datanodeId4 = cluster.getDataNodes().get(0).getDatanodeId();
        String xferAddr2 = datanodeId4.getXferAddr();
        DatanodeDescriptor datanode2 = datanodeManager.getDatanode(datanodeId4);
        arrayList.add(xferAddr2);
        MiniDFSCluster.DataNodeProperties stopDataNode2 = cluster.stopDataNode(0);
        DFSTestUtil.waitForDatanodeState(cluster, datanodeId4.getDatanodeUuid(), false, 30000);
        this.LOG.info("Decommissioning nodes");
        hostsFileWriter.initExcludeHosts(arrayList);
        datanodeManager.refreshNodes(conf);
        BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
        waitForDecommissionedNodes(datanodeManager.getDatanodeAdminManager(), 0);
        Assert.assertTrue(datanode2.isDecommissioned());
        Assert.assertTrue(datanode.isDecommissioned());
        long missingBlocksCount = blockManager.getMissingBlocksCount();
        long lowRedundancyBlocksCount = blockManager.getLowRedundancyBlocksCount();
        Assert.assertTrue(missingBlocksCount > 0);
        Assert.assertTrue(lowRedundancyBlocksCount > 0);
        this.LOG.info("Bring back dn0");
        cluster.restartDataNode(stopDataNode2, true);
        do {
            datanodeId = cluster.getDataNodes().get(0).getDatanodeId();
        } while (datanodeId == null);
        DatanodeDescriptor datanode3 = datanodeManager.getDatanode(datanodeId);
        while (datanode3.numBlocks() == 0) {
            Thread.sleep(100L);
        }
        this.LOG.info("Bring back dn1");
        cluster.restartDataNode(stopDataNode, true);
        do {
            datanodeId2 = cluster.getDataNodes().get(1).getDatanodeId();
        } while (datanodeId2 == null);
        DatanodeDescriptor datanode4 = datanodeManager.getDatanode(datanodeId2);
        while (datanode4.numBlocks() == 0) {
            Thread.sleep(100L);
        }
        Thread.sleep(2000L);
        Assert.assertEquals(lowRedundancyBlocksCount, blockManager.getLowRedundancyBlocksCount());
        this.LOG.info("Starting two more nodes");
        cluster.startDataNodes(conf, 2, true, null, null);
        cluster.waitActive();
        int i = 0;
        while (true) {
            if (blockManager.getLowRedundancyBlocksCount() <= 0 && blockManager.getPendingReconstructionBlocksCount() <= 0) {
                break;
            }
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            } else {
                Thread.sleep(1000L);
            }
        }
        Assert.assertEquals(0L, blockManager.getLowRedundancyBlocksCount());
        Assert.assertEquals(0L, blockManager.getPendingReconstructionBlocksCount());
        Assert.assertEquals(0L, blockManager.getMissingBlocksCount());
        DatanodeID datanodeId5 = cluster.getDataNodes().get(3).getDatanodeId();
        cluster.stopDataNode(3);
        DFSTestUtil.waitForDatanodeState(cluster, datanodeId5.getDatanodeUuid(), false, 30000);
        DatanodeID datanodeId6 = cluster.getDataNodes().get(2).getDatanodeId();
        cluster.stopDataNode(2);
        DFSTestUtil.waitForDatanodeState(cluster, datanodeId6.getDatanodeUuid(), false, 30000);
        hostsFileWriter.initExcludeHost("");
        datanodeManager.refreshNodes(conf);
        fileSys.delete(path, false);
    }
}
