/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestIPC;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestAsyncIPC {
    private static Configuration conf;
    private static final Logger LOG;

    static <T extends Writable> AsyncGetFuture<T, IOException> getAsyncRpcResponseFuture() {
        return new AsyncGetFuture(Client.getAsyncRpcResponse());
    }

    @BeforeEach
    public void setupConf() {
        conf = new Configuration();
        conf.setInt("ipc.client.async.calls.max", 10000);
        Client.setPingInterval((Configuration)conf, (int)1000);
        Client.setAsynchronousMode((boolean)true);
    }

    @Test
    @Timeout(value=60L)
    public void testAsyncCallCheckDisabled() throws IOException, InterruptedException, ExecutionException {
        this.internalTestAsyncCall(3, true, 2, 5, 10, false);
    }

    @Test
    @Timeout(value=60L)
    public void testAsyncCall() throws IOException, InterruptedException, ExecutionException {
        this.internalTestAsyncCall(3, false, 2, 5, 100, true);
        this.internalTestAsyncCall(3, true, 2, 5, 10, true);
    }

    @Test
    @Timeout(value=60L)
    public void testAsyncCallLimit() throws IOException, InterruptedException, ExecutionException {
        this.internalTestAsyncCallLimit(100, false, 5, 10, 500);
    }

    public void internalTestAsyncCall(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount, boolean checkAsyncCallEnabled) throws IOException, InterruptedException, ExecutionException {
        int i;
        TestIPC.TestServer server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client[] clients = new Client[clientCount];
        for (int i2 = 0; i2 < clientCount; ++i2) {
            clients[i2] = new Client(LongWritable.class, conf);
        }
        AsyncCaller[] callers = new AsyncCaller[callerCount];
        for (i = 0; i < callerCount; ++i) {
            callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount, checkAsyncCallEnabled);
            callers[i].start();
        }
        for (i = 0; i < callerCount; ++i) {
            if (!checkAsyncCallEnabled) {
                Assertions.assertEquals((int)0, (int)clients[i % clientCount].getAsyncCallCounter());
            }
            callers[i].join();
            callers[i].assertReturnValues();
        }
        for (i = 0; i < clientCount; ++i) {
            clients[i].stop();
        }
        server.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testCallGetReturnRpcResponseMultipleTimes() throws IOException, InterruptedException, ExecutionException {
        int handlerCount = 10;
        int callCount = 100;
        TestIPC.TestServer server = new TestIPC.TestServer(handlerCount, false, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client client = new Client(LongWritable.class, conf);
        int asyncCallCount = client.getAsyncCallCount();
        try {
            AsyncCaller caller = new AsyncCaller(client, addr, callCount, true);
            caller.run();
            caller.assertReturnValues();
            caller.assertReturnValues();
            caller.assertReturnValues();
            Assertions.assertEquals((int)asyncCallCount, (int)client.getAsyncCallCount());
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testFutureGetWithTimeout() throws IOException, InterruptedException, ExecutionException {
        TestIPC.TestServer server = new TestIPC.TestServer(10, true, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client client = new Client(LongWritable.class, conf);
        try {
            AsyncCaller caller = new AsyncCaller(client, addr, 10, true);
            caller.run();
            caller.assertReturnValues(10L, TimeUnit.MILLISECONDS);
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException {
        int i;
        Configuration conf = new Configuration();
        conf.setInt("ipc.client.async.calls.max", 100);
        Client.setPingInterval((Configuration)conf, (int)1000);
        TestIPC.TestServer server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        server.start();
        Client[] clients = new Client[clientCount];
        for (int i2 = 0; i2 < clientCount; ++i2) {
            clients[i2] = new Client(LongWritable.class, conf);
        }
        AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
        for (i = 0; i < callerCount; ++i) {
            callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr, callCount);
            callers[i].start();
        }
        for (i = 0; i < callerCount; ++i) {
            callers[i].join();
            callers[i].waitForReturnValues(callers[i].getStart(), callers[i].getCount());
            String msg = String.format("Expected not failed for caller-%d: %s.", i, callers[i]);
            Assertions.assertFalse((boolean)callers[i].failed, (String)msg);
        }
        for (i = 0; i < clientCount; ++i) {
            clients[i].stop();
        }
        server.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testCallIdAndRetry() throws IOException, InterruptedException, ExecutionException {
        final HashMap infoMap = new HashMap();
        Client client = new Client(LongWritable.class, conf){

            Client.Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
                Client.setCallIdAndRetryCount((int)Client.nextCallId(), (int)TestIPC.RANDOM.nextInt(255), null);
                Client.Call call = super.createCall(rpcKind, rpcRequest);
                TestIPC.CallInfo info = new TestIPC.CallInfo();
                info.id = call.id;
                info.retry = call.retry;
                infoMap.put(call.id, info);
                return call;
            }

            void checkResponse(RpcHeaderProtos.RpcResponseHeaderProto header) throws IOException {
                super.checkResponse(header);
                Assertions.assertEquals((int)((TestIPC.CallInfo)infoMap.get((Object)Integer.valueOf((int)header.getCallId()))).retry, (int)header.getRetryCount());
            }
        };
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = new Runnable(){

            @Override
            public void run() {
                Assertions.assertEquals((int)((TestIPC.CallInfo)infoMap.get((Object)Integer.valueOf((int)Server.getCallId()))).retry, (int)Server.getCallRetryCount());
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller caller = new AsyncCaller(client, addr, 4, true);
            caller.run();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testCallRetryCount() throws IOException, InterruptedException, ExecutionException {
        int retryCount = 255;
        Client client = new Client(LongWritable.class, conf);
        Client.setCallIdAndRetryCount((int)Client.nextCallId(), (int)255, null);
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = new Runnable(){

            @Override
            public void run() {
                Assertions.assertEquals((int)255, (int)Server.getCallRetryCount());
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller caller = new AsyncCaller(client, addr, 10, true);
            caller.run();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testInitialCallRetryCount() throws IOException, InterruptedException, ExecutionException {
        Client client = new Client(LongWritable.class, conf);
        Client.setCallIdAndRetryCount((int)Client.nextCallId(), (int)0, null);
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = new Runnable(){

            @Override
            public void run() {
                Assertions.assertEquals((int)0, (int)Server.getCallRetryCount());
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller caller = new AsyncCaller(client, addr, 10, true);
            caller.run();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testUniqueSequentialCallIds() throws IOException, InterruptedException, ExecutionException {
        int i;
        int serverThreads = 10;
        int callerCount = 100;
        int perCallerCallCount = 100;
        TestIPC.TestServer server = new TestIPC.TestServer(serverThreads, false, conf);
        final List callIds = Collections.synchronizedList(new ArrayList());
        server.callListener = new Runnable(){

            @Override
            public void run() {
                callIds.add(Server.getCallId());
            }
        };
        Client client = new Client(LongWritable.class, conf);
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCaller[] callers = new AsyncCaller[callerCount];
            for (i = 0; i < callerCount; ++i) {
                callers[i] = new AsyncCaller(client, addr, perCallerCallCount, true);
                callers[i].start();
            }
            for (i = 0; i < callerCount; ++i) {
                callers[i].join();
                callers[i].assertReturnValues();
            }
        }
        finally {
            client.stop();
            server.stop();
        }
        int expectedCallCount = callerCount * perCallerCallCount;
        Assertions.assertEquals((int)expectedCallCount, (int)callIds.size());
        Collections.sort(callIds);
        int startID = (Integer)callIds.get(0);
        for (i = 0; i < expectedCallCount; ++i) {
            Assertions.assertEquals((int)(startID + i), (int)((Integer)callIds.get(i)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testAsyncCallWithCompletableFuture() throws IOException, InterruptedException, ExecutionException {
        Client client = new Client(LongWritable.class, conf);
        TestIPC.TestServer server = new TestIPC.TestServer(1, false, conf);
        server.callListener = () -> {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        try {
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            server.start();
            AsyncCompletableFutureCaller caller = new AsyncCompletableFutureCaller(client, addr, 10);
            caller.start();
            caller.join();
            caller.assertReturnValues();
        }
        finally {
            client.stop();
            server.stop();
        }
    }

    static {
        LOG = LoggerFactory.getLogger(TestAsyncIPC.class);
    }

    static class AsyncLimitlCaller
    extends Thread {
        private Client client;
        private InetSocketAddress server;
        private int count;
        private boolean failed;
        Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
        Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
        int start = 0;
        int end = 0;
        final int callerId;

        int getStart() {
            return this.start;
        }

        int getEnd() {
            return this.end;
        }

        int getCount() {
            return this.count;
        }

        public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
            this(0, client, server, count);
        }

        public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server, int count) {
            this.client = client;
            this.server = server;
            this.count = count;
            Client.setAsynchronousMode((boolean)true);
            this.callerId = callerId;
        }

        @Override
        public void run() {
            Client.setAsynchronousMode((boolean)true);
            for (int i = 0; i < this.count; ++i) {
                try {
                    long param = TestIPC.RANDOM.nextLong();
                    this.runCall(i, param);
                    continue;
                }
                catch (Exception e) {
                    LOG.error(String.format("Caller-%d Call-%d caught: %s", this.callerId, i, StringUtils.stringifyException((Throwable)e)));
                    this.failed = true;
                }
            }
        }

        private void runCall(int idx, long param) throws InterruptedException, ExecutionException, IOException {
            while (true) {
                try {
                    this.doCall(idx, param);
                    return;
                }
                catch (AsyncCallLimitExceededException e) {
                    this.start = this.end;
                    this.end = idx;
                    this.waitForReturnValues(this.start, this.end);
                    continue;
                }
                break;
            }
        }

        private void doCall(int idx, long param) throws IOException {
            TestIPC.call(this.client, param, this.server, conf);
            this.returnFutures.put(idx, (Future<LongWritable>)TestAsyncIPC.getAsyncRpcResponseFuture());
            this.expectedValues.put(idx, param);
        }

        private void waitForReturnValues(int start, int end) throws InterruptedException, ExecutionException {
            for (int i = start; i < end; ++i) {
                LongWritable value = this.returnFutures.get(i).get();
                if (this.expectedValues.get(i).longValue() == value.get()) continue;
                LOG.error(String.format("Caller-%d Call-%d failed!", this.callerId, i));
                this.failed = true;
                break;
            }
        }
    }

    static class AsyncCompletableFutureCaller
    extends Thread {
        private final Client client;
        private final InetSocketAddress server;
        private final int count;
        private final List<CompletableFuture<Writable>> completableFutures;
        private final List<Long> expectedValues;

        AsyncCompletableFutureCaller(Client client, InetSocketAddress server, int count) {
            this.client = client;
            this.server = server;
            this.count = count;
            this.completableFutures = new ArrayList<CompletableFuture<Writable>>(count);
            this.expectedValues = new ArrayList<Long>(count);
            this.setName("Async CompletableFuture Caller");
        }

        @Override
        public void run() {
            Client.setAsynchronousMode((boolean)true);
            long startTime = Time.monotonicNow();
            try {
                for (int i = 0; i < this.count; ++i) {
                    long param = TestIPC.RANDOM.nextLong();
                    TestIPC.call(this.client, param, this.server, conf);
                    this.expectedValues.add(param);
                    this.completableFutures.add(Client.getResponseFuture());
                }
                long cost = Time.monotonicNow() - startTime;
                Assertions.assertTrue((cost < (long)this.count * 100L ? 1 : 0) != 0);
                LOG.info("[{}] run cost {}ms", (Object)Thread.currentThread().getName(), (Object)cost);
            }
            catch (Exception e) {
                Assertions.fail();
            }
        }

        public void assertReturnValues() throws InterruptedException, ExecutionException {
            for (int i = 0; i < this.count; ++i) {
                LongWritable value = (LongWritable)this.completableFutures.get(i).get();
                Assertions.assertEquals((long)this.expectedValues.get(i), (long)value.get(), (String)("call" + i + " failed."));
            }
        }
    }

    static class AsyncCaller
    extends Thread {
        private Client client;
        private InetSocketAddress server;
        private int count;
        private boolean failed;
        Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
        Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();

        AsyncCaller(Client client, InetSocketAddress server, int count, boolean checkAsyncCallEnabled) {
            this.client = client;
            if (!checkAsyncCallEnabled) {
                this.client.setMaxAsyncCalls(-1);
            }
            this.server = server;
            this.count = count;
            Client.setAsynchronousMode((boolean)true);
        }

        @Override
        public void run() {
            Client.setAsynchronousMode((boolean)true);
            for (int i = 0; i < this.count; ++i) {
                try {
                    long param = TestIPC.RANDOM.nextLong();
                    TestIPC.call(this.client, param, this.server, conf);
                    this.returnFutures.put(i, (Future<LongWritable>)TestAsyncIPC.getAsyncRpcResponseFuture());
                    this.expectedValues.put(i, param);
                    continue;
                }
                catch (Exception e) {
                    this.failed = true;
                    throw new RuntimeException(e);
                }
            }
        }

        void assertReturnValues() throws InterruptedException, ExecutionException {
            for (int i = 0; i < this.count; ++i) {
                LongWritable value = this.returnFutures.get(i).get();
                Assertions.assertEquals((long)this.expectedValues.get(i), (long)value.get(), (String)("call" + i + " failed."));
            }
            Assertions.assertFalse((boolean)this.failed);
        }

        void assertReturnValues(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            boolean[] checked = new boolean[this.count];
            boolean done = false;
            while (!done) {
                done = true;
                for (int i = 0; i < this.count; ++i) {
                    LongWritable value;
                    if (checked[i]) continue;
                    done = false;
                    try {
                        value = this.returnFutures.get(i).get(timeout, unit);
                    }
                    catch (TimeoutException e) {
                        LOG.info("call" + i + " caught ", (Throwable)e);
                        continue;
                    }
                    Assertions.assertEquals((long)this.expectedValues.get(i), (long)value.get(), (String)("call" + i + " failed."));
                    checked[i] = true;
                }
            }
            Assertions.assertFalse((boolean)this.failed);
        }
    }
}

