/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.shufflehandler;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.IPStackUtils;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hive.common.util.Retry;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class TestShuffleHandler {
    @Rule
    public Retry retry = new Retry(2);
    private static final File TEST_DIR = new File(System.getProperty("test.build.data"), TestShuffleHandler.class.getName()).getAbsoluteFile();
    private static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";

    @Test(timeout=10000L)
    public void testKeepAlive() throws Exception {
        final ArrayList failures = new ArrayList(1);
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("llap.shuffle.port", 0);
        conf.setBoolean("llap.shuffle.connection-keep-alive.enable", true);
        conf.setInt("llap.shuffle.connection-keep-alive.timeout", -100);
        final LastSocketAddress lastSocketAddress = new LastSocketAddress();
        ShuffleHandler shuffleHandler = new ShuffleHandler(conf){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String jobId, int dagId, String mapId, int reduce, String user) throws IOException {
                        return null;
                    }

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }

                    protected void populateHeaders(List<String> mapIds, String jobId, int dagId, String user, int reduce, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> mapOutputInfoMap) throws IOException {
                        ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        dob = new DataOutputBuffer();
                        for (int i = 0; i < 100000; ++i) {
                            header.write((DataOutput)dob);
                        }
                        long contentLength = dob.getLength();
                        super.setResponseHeaders(response, keepAliveParam, contentLength);
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, int reduce, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                        lastSocketAddress.setAddress(ch.remoteAddress());
                        ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                        dob = new DataOutputBuffer();
                        for (int i = 0; i < 100000; ++i) {
                            header.write((DataOutput)dob);
                        }
                        return ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                    }

                    protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
                        if (failures.size() == 0) {
                            failures.add(new Error());
                            ctx.channel().close();
                        }
                    }

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failures.size() == 0) {
                            failures.add(new Error());
                            ctx.channel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.start();
        String shuffleBaseURL = String.format("http://%s", IPStackUtils.concatLoopbackAddressPort((int)conf.getInt("llap.shuffle.port", 15551)));
        URL url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        DataInputStream input = new DataInputStream(conn.getInputStream());
        Assert.assertEquals((Object)"keep-alive", (Object)conn.getHeaderField("Connection"));
        Assert.assertEquals((Object)"timeout=1", (Object)conn.getHeaderField("keep-alive"));
        Assert.assertEquals((long)200L, (long)conn.getResponseCode());
        ShuffleHeader header = new ShuffleHeader();
        header.readFields((DataInput)input);
        byte[] buffer = new byte[1024];
        while (input.read(buffer) != -1) {
        }
        SocketAddress firstAddress = lastSocketAddress.getSocketAddress();
        input.close();
        url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0&keepAlive=true");
        conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        input = new DataInputStream(conn.getInputStream());
        Assert.assertEquals((Object)"keep-alive", (Object)conn.getHeaderField("Connection"));
        Assert.assertEquals((Object)"timeout=1", (Object)conn.getHeaderField("keep-alive"));
        Assert.assertEquals((long)200L, (long)conn.getResponseCode());
        header = new ShuffleHeader();
        header.readFields((DataInput)input);
        input.close();
        SocketAddress secondAddress = lastSocketAddress.getSocketAddress();
        Assert.assertNotNull((String)"Initial shuffle address should not be null", (Object)firstAddress);
        Assert.assertNotNull((String)"Keep-Alive shuffle address should not be null", (Object)secondAddress);
        Assert.assertEquals((String)"Initial shuffle address and keep-alive shuffle address should be the same", (Object)firstAddress, (Object)secondAddress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSocketKeepAlive() throws Exception {
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("llap.shuffle.port", 0);
        conf.setBoolean("llap.shuffle.connection-keep-alive.enable", true);
        conf.setInt("llap.shuffle.connection-keep-alive.timeout", -100);
        HttpURLConnection conn = null;
        MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(conf);
        try {
            shuffleHandler.start();
            String shuffleBaseURL = String.format("http://%s", IPStackUtils.concatLoopbackAddressPort((int)conf.getInt("llap.shuffle.port", 15551)));
            URL url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
            conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            conn.connect();
            conn.getInputStream();
            Assert.assertTrue((String)"socket should be set KEEP_ALIVE", (boolean)shuffleHandler.isSocketKeepAlive());
        }
        finally {
            if (conn != null) {
                conn.disconnect();
            }
            shuffleHandler.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConfigPortStatic() throws Exception {
        Random rand = new Random();
        int port = rand.nextInt(10) + 50000;
        Configuration conf = new Configuration();
        conf.setInt("llap.shuffle.port", port);
        MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(conf);
        try {
            shuffleHandler.start();
            Assert.assertEquals((long)port, (long)shuffleHandler.getPort());
        }
        finally {
            shuffleHandler.stop();
        }
    }

    @Test
    public void testConfigPortDynamic() throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("llap.shuffle.port", 0);
        MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(conf);
        try {
            shuffleHandler.start();
            Assert.assertTrue((String)"ShuffleHandler should use a random chosen port", (shuffleHandler.getPort() > 0 ? 1 : 0) != 0);
        }
        finally {
            shuffleHandler.stop();
        }
    }

    static class LastSocketAddress {
        SocketAddress lastAddress;

        LastSocketAddress() {
        }

        void setAddress(SocketAddress lastAddress) {
            this.lastAddress = lastAddress;
        }

        SocketAddress getSocketAddress() {
            return this.lastAddress;
        }
    }

    private static class MockShuffleHandler2
    extends ShuffleHandler {
        boolean socketKeepAlive = false;

        MockShuffleHandler2(Configuration conf) {
            super(conf);
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
            return new ShuffleHandler.Shuffle(conf){

                protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    SocketChannel channel = (SocketChannel)ctx.channel();
                    socketKeepAlive = channel.config().isKeepAlive();
                }
            };
        }

        protected boolean isSocketKeepAlive() {
            return this.socketKeepAlive;
        }
    }
}

