/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestDataTransferKeepalive {
    final Configuration conf = new HdfsConfiguration();
    private MiniDFSCluster cluster;
    private DataNode dn;
    private static final Path TEST_FILE = new Path("/test");
    private static final int KEEPALIVE_TIMEOUT = 1000;
    private static final int WRITE_TIMEOUT = 3000;

    @BeforeEach
    public void setup() throws Exception {
        this.conf.setInt("dfs.datanode.socket.reuse.keepalive", 1000);
        this.conf.setInt("dfs.client.max.block.acquire.failures", 0);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(1).build();
        this.dn = this.cluster.getDataNodes().get(0);
    }

    @AfterEach
    public void teardown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test
    @Timeout(value=30L)
    public void testDatanodeRespectsKeepAliveTimeout() throws Exception {
        Configuration clientConf = new Configuration(this.conf);
        long CLIENT_EXPIRY_MS = 60000L;
        clientConf.setLong("dfs.client.socketcache.expiryMsec", 60000L);
        clientConf.set("dfs.client.context", "testDatanodeRespectsKeepAliveTimeout");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        PeerCache peerCache = ClientContext.getFromConf((Configuration)clientConf).getPeerCache();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 1L, (short)1, 0L);
        Assertions.assertEquals((int)0, (int)peerCache.size());
        this.assertXceiverCount(0);
        DFSTestUtil.readFile((FileSystem)fs, TEST_FILE);
        Assertions.assertEquals((int)1, (int)peerCache.size());
        this.assertXceiverCount(1);
        Thread.sleep(4050L);
        this.assertXceiverCount(0);
        Assertions.assertEquals((int)1, (int)peerCache.size());
        Peer peer = peerCache.get(this.dn.getDatanodeId(), false);
        Assertions.assertNotNull((Object)peer);
        Assertions.assertEquals((int)-1, (int)peer.getInputStream().read());
    }

    @Test
    @Timeout(value=30L)
    public void testClientResponsesKeepAliveTimeout() throws Exception {
        Configuration clientConf = new Configuration(this.conf);
        long CLIENT_EXPIRY_MS = 10L;
        clientConf.setLong("dfs.client.socketcache.expiryMsec", 10L);
        clientConf.set("dfs.client.context", "testClientResponsesKeepAliveTimeout");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        PeerCache peerCache = ClientContext.getFromConf((Configuration)clientConf).getPeerCache();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 1L, (short)1, 0L);
        Assertions.assertEquals((int)0, (int)peerCache.size());
        this.assertXceiverCount(0);
        DFSTestUtil.readFile((FileSystem)fs, TEST_FILE);
        Assertions.assertEquals((int)1, (int)peerCache.size());
        this.assertXceiverCount(1);
        Thread.sleep(60L);
        Peer peer = peerCache.get(this.dn.getDatanodeId(), false);
        Assertions.assertTrue((peer == null ? 1 : 0) != 0);
        Assertions.assertEquals((int)0, (int)peerCache.size());
    }

    @Test
    @Timeout(value=300L)
    public void testSlowReader() throws Exception {
        long CLIENT_EXPIRY_MS = 600000L;
        Configuration clientConf = new Configuration(this.conf);
        clientConf.setLong("dfs.client.socketcache.expiryMsec", 600000L);
        clientConf.set("dfs.client.context", "testSlowReader");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        MiniDFSCluster.DataNodeProperties props = this.cluster.stopDataNode(0);
        props.conf.setInt("dfs.datanode.socket.write.timeout", 3000);
        props.conf.setInt("dfs.datanode.socket.reuse.keepalive", 120000);
        Assertions.assertTrue((boolean)this.cluster.restartDataNode(props, true));
        this.dn = this.cluster.getDataNodes().get(0);
        this.cluster.triggerHeartbeats();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 0x800000L, (short)1, 0L);
        FSDataInputStream stm = fs.open(TEST_FILE);
        stm.read();
        this.assertXceiverCount(1);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                return TestDataTransferKeepalive.this.getXceiverCountWithoutServer() == 0;
            }
        }, (long)500L, (long)50000L);
        IOUtils.closeStream((Closeable)stm);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testManyClosedSocketsInCache() throws Exception {
        Configuration clientConf = new Configuration(this.conf);
        clientConf.set("dfs.client.context", "testManyClosedSocketsInCache");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)this.cluster.getURI(), (Configuration)clientConf);
        PeerCache peerCache = ClientContext.getFromConf((Configuration)clientConf).getPeerCache();
        DFSTestUtil.createFile((FileSystem)fs, TEST_FILE, 1L, (short)1, 0L);
        Closeable[] stms = new InputStream[5];
        try {
            for (int i = 0; i < stms.length; ++i) {
                stms[i] = fs.open(TEST_FILE);
            }
            for (Closeable stm : stms) {
                IOUtils.copyBytes((InputStream)stm, (OutputStream)new IOUtils.NullOutputStream(), (int)1024);
            }
        }
        finally {
            IOUtils.cleanupWithLogger(null, (Closeable[])stms);
        }
        Assertions.assertEquals((int)5, (int)peerCache.size());
        Thread.sleep(1500L);
        this.assertXceiverCount(0);
        Assertions.assertEquals((int)5, (int)peerCache.size());
        DFSTestUtil.readFile((FileSystem)fs, TEST_FILE);
    }

    private void assertXceiverCount(int expected) {
        int count = this.getXceiverCountWithoutServer();
        if (count != expected) {
            ReflectionUtils.printThreadInfo((PrintStream)System.err, (String)"Thread dumps");
            Assertions.fail((String)("Expected " + expected + " xceivers, found " + count));
        }
    }

    private int getXceiverCountWithoutServer() {
        return this.dn.getXceiverCount();
    }
}

