/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.web;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestWebHdfsTimeouts {
    private static final Logger LOG = LoggerFactory.getLogger(TestWebHdfsTimeouts.class);
    private static final int CLIENTS_TO_CONSUME_BACKLOG = 129;
    private static final int CONNECTION_BACKLOG = 1;
    private static final int SHORT_SOCKET_TIMEOUT = 200;
    private List<SocketChannel> clients;
    private WebHdfsFileSystem fs;
    private InetSocketAddress nnHttpAddress;
    private ServerSocket serverSocket;
    private Thread serverThread;
    private final URLConnectionFactory connectionFactory = new URLConnectionFactory(new ConnectionConfigurator(){

        public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
            conn.setReadTimeout(200);
            conn.setConnectTimeout(200);
            return conn;
        }
    });
    private volatile boolean failedToConsumeBacklog;

    public static Collection<Object[]> data() {
        return Arrays.asList({TimeoutSource.ConnectionFactory}, {TimeoutSource.Configuration});
    }

    public void setUp(TimeoutSource timeoutSource) throws Exception {
        Configuration conf = WebHdfsTestUtil.createConf();
        this.serverSocket = new ServerSocket(0, 1);
        this.nnHttpAddress = new InetSocketAddress("localhost", this.serverSocket.getLocalPort());
        conf.set("dfs.namenode.http-address", "localhost:" + this.serverSocket.getLocalPort());
        if (timeoutSource == TimeoutSource.Configuration) {
            String v = Integer.toString(200) + "ms";
            conf.set("dfs.webhdfs.socket.connect-timeout", v);
            conf.set("dfs.webhdfs.socket.read-timeout", v);
        }
        this.fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, "webhdfs");
        if (timeoutSource == TimeoutSource.ConnectionFactory) {
            this.fs.connectionFactory = this.connectionFactory;
        }
        this.clients = new ArrayList<SocketChannel>();
        this.serverThread = null;
        this.failedToConsumeBacklog = false;
    }

    @AfterEach
    public void tearDown() throws Exception {
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])this.clients.toArray(new SocketChannel[this.clients.size()]));
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{this.fs});
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            }
            catch (IOException e) {
                LOG.debug("Exception in closing " + this.serverSocket, (Throwable)e);
            }
        }
        if (this.serverThread != null) {
            this.serverThread.join();
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testConnectTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        this.consumeConnectionBacklog();
        try {
            this.fs.listFiles(new Path("/"), false);
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": connect timed out"), (Throwable)e);
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testReadTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        try {
            this.fs.listFiles(new Path("/"), false);
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": Read timed out"), (Throwable)e);
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testAuthUrlConnectTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        this.consumeConnectionBacklog();
        try {
            this.fs.getDelegationToken("renewer");
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": connect timed out"), (Throwable)e);
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testAuthUrlReadTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        try {
            this.fs.getDelegationToken("renewer");
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": Read timed out"), (Throwable)e);
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testRedirectConnectTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        this.startSingleTemporaryRedirectResponseThread(true);
        try {
            this.fs.getFileChecksum(new Path("/file"));
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            this.assumeBacklogConsumed();
            GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": connect timed out"), (Throwable)e);
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testRedirectReadTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        this.startSingleTemporaryRedirectResponseThread(false);
        try {
            this.fs.getFileChecksum(new Path("/file"));
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": Read timed out"), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testTwoStepWriteConnectTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        this.startSingleTemporaryRedirectResponseThread(true);
        FSDataOutputStream os = null;
        try {
            os = this.fs.create(new Path("/file"));
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            try {
                this.assumeBacklogConsumed();
                GenericTestUtils.assertExceptionContains((String)(this.fs.getUri().getAuthority() + ": connect timed out"), (Throwable)e);
            }
            catch (Throwable throwable) {
                IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{os});
                throw throwable;
            }
            IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{os});
        }
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{os});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @MethodSource(value={"data"})
    @ParameterizedTest
    @EnumSource(value=TimeoutSource.class)
    @Timeout(value=100L)
    public void testTwoStepWriteReadTimeout(TimeoutSource src) throws Exception {
        this.setUp(src);
        this.startSingleTemporaryRedirectResponseThread(false);
        FSDataOutputStream os = null;
        try {
            os = this.fs.create(new Path("/file"));
            os.close();
            os = null;
            Assertions.fail((String)"expected timeout");
        }
        catch (SocketTimeoutException e) {
            try {
                GenericTestUtils.assertExceptionContains((String)"Read timed out", (Throwable)e);
            }
            catch (Throwable throwable) {
                IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{os});
                throw throwable;
            }
            IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{os});
        }
        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{os});
    }

    private void startSingleTemporaryRedirectResponseThread(final boolean consumeConnectionBacklog) {
        this.fs.connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
        this.serverThread = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Socket clientSocket = null;
                OutputStream out = null;
                InputStream in = null;
                InputStreamReader isr = null;
                BufferedReader br = null;
                try {
                    String line;
                    clientSocket = TestWebHdfsTimeouts.this.serverSocket.accept();
                    ((TestWebHdfsTimeouts)TestWebHdfsTimeouts.this).fs.connectionFactory = TestWebHdfsTimeouts.this.connectionFactory;
                    if (consumeConnectionBacklog) {
                        TestWebHdfsTimeouts.this.consumeConnectionBacklog();
                    }
                    in = clientSocket.getInputStream();
                    isr = new InputStreamReader(in);
                    br = new BufferedReader(isr);
                    while ((line = br.readLine()) != null && !line.isEmpty()) {
                    }
                    out = clientSocket.getOutputStream();
                    out.write(TestWebHdfsTimeouts.this.temporaryRedirect().getBytes(StandardCharsets.UTF_8));
                }
                catch (IOException e) {
                    try {
                        LOG.error("unexpected IOException in server thread", (Throwable)e);
                        Assertions.fail((String)("unexpected IOException in server thread: " + e));
                    }
                    catch (Throwable throwable) {
                        IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{br, isr, in, out});
                        IOUtils.closeSocket((Socket)clientSocket);
                        throw throwable;
                    }
                    IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{br, isr, in, out});
                    IOUtils.closeSocket((Socket)clientSocket);
                }
                IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{br, isr, in, out});
                IOUtils.closeSocket((Socket)clientSocket);
            }
        };
        this.serverThread.start();
    }

    private void consumeConnectionBacklog() throws IOException {
        for (int i = 0; i < 129; ++i) {
            SocketChannel client = SocketChannel.open();
            client.configureBlocking(false);
            client.connect(this.nnHttpAddress);
            this.clients.add(client);
        }
        try {
            GenericTestUtils.waitFor(() -> {
                try (SocketChannel c = SocketChannel.open();){
                    c.socket().connect(this.nnHttpAddress, 100);
                }
                catch (SocketTimeoutException e) {
                    return true;
                }
                catch (IOException e) {
                    LOG.debug("unexpected exception: " + e);
                }
                return false;
            }, (long)100L, (long)10000L);
        }
        catch (InterruptedException | TimeoutException e) {
            this.failedToConsumeBacklog = true;
            this.assumeBacklogConsumed();
        }
    }

    private void assumeBacklogConsumed() {
        if (this.failedToConsumeBacklog) {
            throw new TestAbortedException("failed to fill up connection backlog.");
        }
    }

    private String temporaryRedirect() {
        return "HTTP/1.1 307 Temporary Redirect\r\nLocation: http://" + NetUtils.getHostPortString((InetSocketAddress)this.nnHttpAddress) + "\r\n\r\n";
    }

    public static enum TimeoutSource {
        ConnectionFactory,
        Configuration;

    }
}

