/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestIPC;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestAsyncIPC {
    private static Configuration conf;
    private static final Logger LOG;

    static <T extends Writable> AsyncGetFuture<T, IOException> getAsyncRpcResponseFuture() {
        return new AsyncGetFuture(Client.getAsyncRpcResponse());
    }

    @Before
    public void setupConf() {
        conf = new Configuration();
        conf.setInt("ipc.client.async.calls.max", 10000);
        Client.setPingInterval((Configuration)conf, (int)1000);
        Client.setAsynchronousMode((boolean)true);
    }

    @Test(timeout=60000L)
    public void testAsyncCall() throws IOException, InterruptedException, ExecutionException {
        this.internalTestAsyncCall(3, false, 2, 5, 100);
        this.internalTestAsyncCall(3, true, 2, 5, 10);
    }

    @Test(timeout=60000L)
    public void testAsyncCallLimit() throws IOException, InterruptedException, ExecutionException {
        this.internalTestAsyncCallLimit(100, false, 5, 10, 500);
    }

    public void internalTestAsyncCall(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException {
        int i;
        TestIPC.TestServer server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client[] clients = new Client[clientCount];
        for (int i2 = 0; i2 < clientCount; ++i2) {
            clients[i2] = new Client(LongWritable.class, conf);
        }
        AsyncCaller[] callers = new AsyncCaller[callerCount];
        for (i = 0; i < callerCount; ++i) {
            callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
            callers[i].start();
        }
        for (i = 0; i < callerCount; ++i) {
            callers[i].join();
            callers[i].assertReturnValues();
        }
        for (i = 0; i < clientCount; ++i) {
            clients[i].stop();
        }
        server.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testCallGetReturnRpcResponseMultipleTimes() throws IOException, InterruptedException, ExecutionException {
        int handlerCount = 10;
        int callCount = 100;
        TestIPC.TestServer server = new TestIPC.TestServer(handlerCount, false, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client client = new Client(LongWritable.class, conf);
        int asyncCallCount = client.getAsyncCallCount();
        try {
            AsyncCaller caller = new AsyncCaller(client, addr, callCount);
            caller.run();
            caller.assertReturnValues();
            caller.assertReturnValues();
            caller.assertReturnValues();
            Assert.assertEquals((long)asyncCallCount, (long)client.getAsyncCallCount());
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testFutureGetWithTimeout() throws IOException, InterruptedException, ExecutionException {
        TestIPC.TestServer server = new TestIPC.TestServer(10, true, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client client = new Client(LongWritable.class, conf);
        try {
            AsyncCaller caller = new AsyncCaller(client, addr, 10);
            caller.run();
            caller.assertReturnValues(10L, TimeUnit.MILLISECONDS);
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException {
        int i;
        Configuration conf = new Configuration();
        conf.setInt("ipc.client.async.calls.max", 100);
        Client.setPingInterval((Configuration)conf, (int)1000);
        TestIPC.TestServer server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client[] clients = new Client[clientCount];
        for (int i2 = 0; i2 < clientCount; ++i2) {
            clients[i2] = new Client(LongWritable.class, conf);
        }
        AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
        for (i = 0; i < callerCount; ++i) {
            callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr, callCount);
            callers[i].start();
        }
        for (i = 0; i < callerCount; ++i) {
            callers[i].join();
            callers[i].waitForReturnValues(callers[i].getStart(), callers[i].getCount());
            String msg = String.format("Expected not failed for caller-%d: %s.", i, callers[i]);
            Assert.assertFalse((String)msg, (boolean)callers[i].failed);
        }
        for (i = 0; i < clientCount; ++i) {
            clients[i].stop();
        }
        server.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testCallIdAndRetry() throws IOException, InterruptedException, ExecutionException {
        final HashMap infoMap = new HashMap();
        Client client = new Client(LongWritable.class, conf){

            Client.Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
                Client.setCallIdAndRetryCount((int)Client.nextCallId(), (int)TestIPC.RANDOM.nextInt(255), null);
                Client.Call call = super.createCall(rpcKind, rpcRequest);
                TestIPC.CallInfo info = new TestIPC.CallInfo();
                info.id = call.id;
                info.retry = call.retry;
                infoMap.put(call.id, info);
                return call;
            }

            void checkResponse(RpcHeaderProtos.RpcResponseHeaderProto header) throws IOException {
                super.checkResponse(header);
                Assert.assertEquals((long)((TestIPC.CallInfo)infoMap.get((Object)Integer.valueOf((int)header.getCallId()))).retry, (long)header.getRetryCount());
            }
        };
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = new Runnable(){

            @Override
            public void run() {
                Assert.assertEquals((long)((TestIPC.CallInfo)infoMap.get((Object)Integer.valueOf((int)Server.getCallId()))).retry, (long)Server.getCallRetryCount());
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller caller = new AsyncCaller(client, addr, 4);
            caller.run();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testCallRetryCount() throws IOException, InterruptedException, ExecutionException {
        int retryCount = 255;
        Client client = new Client(LongWritable.class, conf);
        Client.setCallIdAndRetryCount((int)Client.nextCallId(), (int)255, null);
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = new Runnable(){

            @Override
            public void run() {
                Assert.assertEquals((long)255L, (long)Server.getCallRetryCount());
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller caller = new AsyncCaller(client, addr, 10);
            caller.run();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testInitialCallRetryCount() throws IOException, InterruptedException, ExecutionException {
        Client client = new Client(LongWritable.class, conf);
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = new Runnable(){

            @Override
            public void run() {
                Assert.assertEquals((long)0L, (long)Server.getCallRetryCount());
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller caller = new AsyncCaller(client, addr, 10);
            caller.run();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testUniqueSequentialCallIds() throws IOException, InterruptedException, ExecutionException {
        int i;
        int serverThreads = 10;
        int callerCount = 100;
        int perCallerCallCount = 100;
        TestIPC.TestServer server = new TestIPC.TestServer(serverThreads, false, conf);
        final List callIds = Collections.synchronizedList(new ArrayList());
        server.callListener = new Runnable(){

            @Override
            public void run() {
                callIds.add(Server.getCallId());
            }
        };
        Client client = new Client(LongWritable.class, conf);
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller[] callers = new AsyncCaller[callerCount];
            for (i = 0; i < callerCount; ++i) {
                callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
                callers[i].start();
            }
            for (i = 0; i < callerCount; ++i) {
                callers[i].join();
                callers[i].assertReturnValues();
            }
        }
        finally {
            client.stop();
            server.stop();
        }
        int expectedCallCount = callerCount * perCallerCallCount;
        Assert.assertEquals((long)expectedCallCount, (long)callIds.size());
        Collections.sort(callIds);
        int startID = (Integer)callIds.get(0);
        for (i = 0; i < expectedCallCount; ++i) {
            Assert.assertEquals((long)(startID + i), (long)((Integer)callIds.get(i)).intValue());
        }
    }

    static {
        LOG = LoggerFactory.getLogger(TestAsyncIPC.class);
    }

    static class AsyncCaller
    extends Thread {
        private Client client;
        private InetSocketAddress server;
        private int count;
        private boolean failed;
        Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
        Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();

        public AsyncCaller(Client client, InetSocketAddress server, int count) {
            this.client = client;
            this.server = server;
            this.count = count;
            Client.setAsynchronousMode((boolean)true);
        }

        @Override
        public void run() {
            Client.setAsynchronousMode((boolean)true);
            for (int i = 0; i < this.count; ++i) {
                try {
                    long param = TestIPC.RANDOM.nextLong();
                    TestIPC.call(this.client, param, this.server, conf);
                    this.returnFutures.put(i, (Future<LongWritable>)TestAsyncIPC.getAsyncRpcResponseFuture());
                    this.expectedValues.put(i, param);
                    continue;
                }
                catch (Exception e) {
                    this.failed = true;
                    throw new RuntimeException(e);
                }
            }
        }

        void assertReturnValues() throws InterruptedException, ExecutionException {
            for (int i = 0; i < this.count; ++i) {
                LongWritable value = this.returnFutures.get(i).get();
                Assert.assertEquals((String)("call" + i + " failed."), (long)this.expectedValues.get(i), (long)value.get());
            }
            Assert.assertFalse((boolean)this.failed);
        }

        void assertReturnValues(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            boolean[] checked = new boolean[this.count];
            boolean done = false;
            while (!done) {
                done = true;
                for (int i = 0; i < this.count; ++i) {
                    LongWritable value;
                    if (checked[i]) continue;
                    done = false;
                    try {
                        value = this.returnFutures.get(i).get(timeout, unit);
                    }
                    catch (TimeoutException e) {
                        LOG.info("call" + i + " caught ", (Throwable)e);
                        continue;
                    }
                    Assert.assertEquals((String)("call" + i + " failed."), (long)this.expectedValues.get(i), (long)value.get());
                    checked[i] = true;
                }
            }
            Assert.assertFalse((boolean)this.failed);
        }
    }

    static class AsyncLimitlCaller
    extends Thread {
        private Client client;
        private InetSocketAddress server;
        private int count;
        private boolean failed;
        Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
        Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
        int start = 0;
        int end = 0;
        final int callerId;

        int getStart() {
            return this.start;
        }

        int getEnd() {
            return this.end;
        }

        int getCount() {
            return this.count;
        }

        public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
            this(0, client, server, count);
        }

        public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server, int count) {
            this.client = client;
            this.server = server;
            this.count = count;
            Client.setAsynchronousMode((boolean)true);
            this.callerId = callerId;
        }

        @Override
        public void run() {
            Client.setAsynchronousMode((boolean)true);
            for (int i = 0; i < this.count; ++i) {
                try {
                    long param = TestIPC.RANDOM.nextLong();
                    this.runCall(i, param);
                    continue;
                }
                catch (Exception e) {
                    LOG.error(String.format("Caller-%d Call-%d caught: %s", this.callerId, i, StringUtils.stringifyException((Throwable)e)));
                    this.failed = true;
                }
            }
        }

        private void runCall(int idx, long param) throws InterruptedException, ExecutionException, IOException {
            while (true) {
                try {
                    this.doCall(idx, param);
                    return;
                }
                catch (AsyncCallLimitExceededException e) {
                    this.start = this.end;
                    this.end = idx;
                    this.waitForReturnValues(this.start, this.end);
                    continue;
                }
                break;
            }
        }

        private void doCall(int idx, long param) throws IOException {
            TestIPC.call(this.client, param, this.server, conf);
            this.returnFutures.put(idx, (Future<LongWritable>)TestAsyncIPC.getAsyncRpcResponseFuture());
            this.expectedValues.put(idx, param);
        }

        private void waitForReturnValues(int start, int end) throws InterruptedException, ExecutionException {
            for (int i = start; i < end; ++i) {
                LongWritable value = this.returnFutures.get(i).get();
                if (this.expectedValues.get(i).longValue() == value.get()) continue;
                LOG.error(String.format("Caller-%d Call-%d failed!", this.callerId, i));
                this.failed = true;
                break;
            }
        }
    }
}

