/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestListOpenFiles {
    private static final int NUM_DATA_NODES = 3;
    private static final int BATCH_SIZE = 5;
    private static MiniDFSCluster cluster = null;
    private static DistributedFileSystem fs = null;
    private static NamenodeProtocols nnRpc = null;
    private static final Logger LOG = LoggerFactory.getLogger(TestListOpenFiles.class);

    @BeforeEach
    public void setUp() throws IOException {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.heartbeat.interval", 1L);
        conf.setLong("dfs.namenode.list.openfiles.num.responses", 5L);
        cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(3).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        nnRpc = cluster.getNameNodeRpc();
    }

    @AfterEach
    public void tearDown() throws IOException {
        if (fs != null) {
            fs.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test
    @Timeout(value=120L)
    public void testListOpenFilesViaNameNodeRPC() throws Exception {
        HashMap<Path, FSDataOutputStream> openFiles = new HashMap<Path, FSDataOutputStream>();
        this.createFiles((FileSystem)fs, "closed", 10);
        this.verifyOpenFiles(openFiles);
        BatchedRemoteIterator.BatchedEntries openFileEntryBatchedEntries = nnRpc.listOpenFiles(0L, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "/");
        Assertions.assertTrue((openFileEntryBatchedEntries.size() == 0 ? 1 : 0) != 0, (String)"Open files list should be empty!");
        BatchedRemoteIterator.BatchedEntries openFilesBlockingDecomEntries = nnRpc.listOpenFiles(0L, EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION), "/");
        Assertions.assertTrue((openFilesBlockingDecomEntries.size() == 0 ? 1 : 0) != 0, (String)"Open files list blocking decommission should be empty!");
        openFiles.putAll(DFSTestUtil.createOpenFiles((FileSystem)fs, "open-1", 1));
        this.verifyOpenFiles(openFiles);
        openFiles.putAll(DFSTestUtil.createOpenFiles((FileSystem)fs, "open-2", 12));
        this.verifyOpenFiles(openFiles);
        DFSTestUtil.closeOpenFiles(openFiles, openFiles.size() / 2);
        this.verifyOpenFiles(openFiles);
        openFiles.putAll(DFSTestUtil.createOpenFiles((FileSystem)fs, "open-3", 25));
        this.verifyOpenFiles(openFiles);
        while (openFiles.size() > 0) {
            DFSTestUtil.closeOpenFiles(openFiles, 1);
            this.verifyOpenFiles(openFiles);
        }
    }

    private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles, EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path) throws IOException {
        BatchedRemoteIterator.BatchedEntries batchedEntries;
        HashSet<Path> remainingFiles = new HashSet<Path>(openFiles.keySet());
        OpenFileEntry lastEntry = null;
        do {
            batchedEntries = lastEntry == null ? nnRpc.listOpenFiles(0L, openFilesTypes, path) : nnRpc.listOpenFiles(lastEntry.getId(), openFilesTypes, path);
            Assertions.assertTrue((batchedEntries.size() <= 5 ? 1 : 0) != 0, (String)"Incorrect open files list size!");
            for (int i = 0; i < batchedEntries.size(); ++i) {
                lastEntry = (OpenFileEntry)batchedEntries.get(i);
                String filePath = lastEntry.getFilePath();
                LOG.info("OpenFile: " + filePath);
                Assertions.assertTrue((boolean)remainingFiles.remove(new Path(filePath)), (String)("Unexpected open file: " + filePath));
            }
        } while (batchedEntries.hasMore());
        Assertions.assertTrue((remainingFiles.size() == 0 ? 1 : 0) != 0, (String)(remainingFiles.size() + " open files not listed!"));
    }

    private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles) throws IOException {
        this.verifyOpenFiles(openFiles, "/");
    }

    private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles, String path) throws IOException {
        this.verifyOpenFiles(openFiles, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), path);
        this.verifyOpenFiles(new HashMap<Path, FSDataOutputStream>(), EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION), path);
    }

    private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix, int numFilesToCreate) throws IOException {
        HashSet<Path> files = new HashSet<Path>();
        for (int i = 0; i < numFilesToCreate; ++i) {
            Path filePath = new Path(fileNamePrefix + "-" + i);
            DFSTestUtil.createFile(fileSystem, filePath, 1024L, (short)3, 1L);
        }
        return files;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=120L)
    public void testListOpenFilesInHA() throws Exception {
        fs.close();
        cluster.shutdown();
        HdfsConfiguration haConf = new HdfsConfiguration();
        haConf.setLong("dfs.namenode.list.openfiles.num.responses", 5L);
        MiniDFSCluster haCluster = new MiniDFSCluster.Builder((Configuration)haConf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0).build();
        try {
            HATestUtil.setFailoverConfigurations(haCluster, (Configuration)haConf);
            DistributedFileSystem fileSystem = HATestUtil.configureFailoverFs(haCluster, (Configuration)haConf);
            List namenodes = HAUtil.getProxiesForAllNameNodesInNameservice((Configuration)haConf, (String)HATestUtil.getLogicalHostname(haCluster));
            haCluster.transitionToActive(0);
            Assertions.assertTrue((boolean)HAUtil.isAtLeastOneActive((List)namenodes));
            byte[] data = new byte[1024];
            ThreadLocalRandom.current().nextBytes(data);
            DFSTestUtil.createOpenFiles((FileSystem)fileSystem, "ha-open-file", 22);
            final DFSAdmin dfsAdmin = new DFSAdmin((Configuration)haConf);
            final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
            final AtomicBoolean listOpenFilesError = new AtomicBoolean(false);
            int listingIntervalMsec = 250;
            Thread clientThread = new Thread(new Runnable(){

                @Override
                public void run() {
                    while (!failoverCompleted.get()) {
                        try {
                            Assertions.assertEquals((int)0, (int)ToolRunner.run((Tool)dfsAdmin, (String[])new String[]{"-listOpenFiles"}));
                            Assertions.assertEquals((int)0, (int)ToolRunner.run((Tool)dfsAdmin, (String[])new String[]{"-listOpenFiles", "-blockingDecommission"}));
                            Thread.sleep(250L);
                        }
                        catch (Exception e) {
                            listOpenFilesError.set(true);
                            LOG.info("Error listing open files: ", (Throwable)e);
                            break;
                        }
                    }
                }
            });
            clientThread.start();
            Thread.sleep(500L);
            LOG.info("Shutting down Active NN0!");
            haCluster.shutdownNameNode(0);
            LOG.info("Transitioning NN1 to Active!");
            haCluster.transitionToActive(1);
            failoverCompleted.set(true);
            Assertions.assertEquals((int)0, (int)ToolRunner.run((Tool)dfsAdmin, (String[])new String[]{"-listOpenFiles"}));
            Assertions.assertEquals((int)0, (int)ToolRunner.run((Tool)dfsAdmin, (String[])new String[]{"-listOpenFiles", "-blockingDecommission"}));
            Assertions.assertFalse((boolean)listOpenFilesError.get(), (String)"Client Error!");
            clientThread.join();
        }
        finally {
            if (haCluster != null) {
                haCluster.shutdown();
            }
        }
    }

    @Test
    @Timeout(value=120L)
    public void testListOpenFilesWithFilterPath() throws IOException {
        HashMap<Path, FSDataOutputStream> openFiles = new HashMap<Path, FSDataOutputStream>();
        this.createFiles((FileSystem)fs, "closed", 10);
        this.verifyOpenFiles(openFiles, "/");
        BatchedRemoteIterator.BatchedEntries openFileEntryBatchedEntries = nnRpc.listOpenFiles(0L, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "/");
        Assertions.assertTrue((openFileEntryBatchedEntries.size() == 0 ? 1 : 0) != 0, (String)"Open files list should be empty!");
        BatchedRemoteIterator.BatchedEntries openFilesBlockingDecomEntries = nnRpc.listOpenFiles(0L, EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION), "/");
        Assertions.assertTrue((openFilesBlockingDecomEntries.size() == 0 ? 1 : 0) != 0, (String)"Open files list blocking decommission should be empty!");
        openFiles.putAll(DFSTestUtil.createOpenFiles((FileSystem)fs, new Path("/base"), "open-1", 1));
        Map<Path, FSDataOutputStream> baseOpen = DFSTestUtil.createOpenFiles((FileSystem)fs, new Path("/base-open"), "open-1", 1);
        this.verifyOpenFiles(openFiles, "/base");
        this.verifyOpenFiles(openFiles, "/base/");
        openFiles.putAll(baseOpen);
        while (openFiles.size() > 0) {
            DFSTestUtil.closeOpenFiles(openFiles, 1);
            this.verifyOpenFiles(openFiles, "/");
        }
    }

    @Test
    public void testListOpenFilesWithInvalidPathServerSide() throws Exception {
        HashMap<Path, FSDataOutputStream> openFiles = new HashMap<Path, FSDataOutputStream>();
        openFiles.putAll(DFSTestUtil.createOpenFiles((FileSystem)fs, new Path("/base"), "open-1", 1));
        this.verifyOpenFiles(openFiles, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "/base");
        LambdaTestUtils.intercept(AssertionError.class, (String)"Absolute path required", (String)"Expect InvalidPathException", () -> this.verifyOpenFiles(new HashMap<Path, FSDataOutputStream>(), EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "hdfs://cluster/base"));
        while (openFiles.size() > 0) {
            DFSTestUtil.closeOpenFiles(openFiles, 1);
            this.verifyOpenFiles(openFiles);
        }
    }

    @Test
    public void testListOpenFilesWithInvalidPathClientSide() throws Exception {
        LambdaTestUtils.intercept(IllegalArgumentException.class, (String)"Wrong FS", (String)"Expect IllegalArgumentException", () -> fs.listOpenFiles(EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "hdfs://non-cluster/"));
        fs.listOpenFiles(EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "/path");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListOpenFilesWithDeletedPath() throws Exception {
        HashMap<Path, FSDataOutputStream> openFiles = new HashMap<Path, FSDataOutputStream>();
        openFiles.putAll(DFSTestUtil.createOpenFiles((FileSystem)fs, new Path("/"), "open-1", 1));
        BatchedRemoteIterator.BatchedEntries openFileEntryBatchedEntries = nnRpc.listOpenFiles(0L, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "/");
        Assertions.assertEquals((int)1, (int)openFileEntryBatchedEntries.size());
        String path = ((OpenFileEntry)openFileEntryBatchedEntries.get(0)).getFilePath();
        FSNamesystem fsNamesystem = cluster.getNamesystem();
        FSDirectory dir = fsNamesystem.getFSDirectory();
        ChunkedArrayList removedINodes = new ChunkedArrayList();
        removedINodes.add(dir.getINode(path));
        fsNamesystem.writeLock(RwLockMode.FS);
        try {
            dir.removeFromInodeMap((List)removedINodes);
            openFileEntryBatchedEntries = nnRpc.listOpenFiles(0L, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), "/");
            Assertions.assertEquals((int)0, (int)openFileEntryBatchedEntries.size());
            fsNamesystem.leaseManager.removeLease(dir.getINode(path).getId());
        }
        catch (NullPointerException e) {
            Assertions.fail((String)"Should not throw NPE when the file is deleted but has lease!");
        }
        finally {
            fsNamesystem.writeUnlock(RwLockMode.FS, "testListOpenFilesWithDeletedPath");
        }
    }
}

