/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtobufRpcEngineCallback2;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestProtoBufRpcServerHandoff {
    public static final Logger LOG = LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);

    @Test(timeout=20000L)
    public void test() throws Exception {
        Configuration conf = new Configuration();
        TestProtoBufRpcServerHandoffServer serverImpl = new TestProtoBufRpcServerHandoffServer();
        BlockingService blockingService = TestRpcServiceProtos.TestProtobufRpcHandoffProto.newReflectiveBlockingService(serverImpl);
        RPC.setProtocolEngine((Configuration)conf, TestProtoBufRpcServerHandoffProtocol.class, ProtobufRpcEngine2.class);
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtoBufRpcServerHandoffProtocol.class).setInstance((Object)blockingService).setVerbose(true).setNumHandlers(1).build();
        server.start();
        InetSocketAddress address = server.getListenerAddress();
        long serverStartTime = System.currentTimeMillis();
        LOG.info("Server started at: " + address + " at time: " + serverStartTime);
        TestProtoBufRpcServerHandoffProtocol client = (TestProtoBufRpcServerHandoffProtocol)RPC.getProxy(TestProtoBufRpcServerHandoffProtocol.class, (long)1L, (InetSocketAddress)address, (Configuration)conf);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        ExecutorCompletionService<ClientInvocationCallable> completionService = new ExecutorCompletionService<ClientInvocationCallable>(executorService);
        completionService.submit(new ClientInvocationCallable(client, 5000L));
        completionService.submit(new ClientInvocationCallable(client, 5000L));
        long submitTime = System.currentTimeMillis();
        Future future1 = completionService.take();
        Future future2 = completionService.take();
        ClientInvocationCallable callable1 = (ClientInvocationCallable)future1.get();
        ClientInvocationCallable callable2 = (ClientInvocationCallable)future2.get();
        LOG.info(callable1.toString());
        LOG.info(callable2.toString());
        Assert.assertTrue((Math.abs(callable1.endTime - callable2.endTime) < 2000L ? 1 : 0) != 0);
        Assert.assertTrue((System.currentTimeMillis() - submitTime < 7000L ? 1 : 0) != 0);
    }

    public static class TestProtoBufRpcServerHandoffServer
    implements TestProtoBufRpcServerHandoffProtocol {
        @Override
        public TestProtos.SleepResponseProto2 sleep(RpcController controller, TestProtos.SleepRequestProto2 request) throws ServiceException {
            final long startTime = System.currentTimeMillis();
            final ProtobufRpcEngineCallback2 callback = ProtobufRpcEngine2.Server.registerForDeferredResponse2();
            final long sleepTime = request.getSleepTime();
            new Thread(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(sleepTime);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    callback.setResponse((Message)TestProtos.SleepResponseProto2.newBuilder().setReceiveTime(startTime).setResponseTime(System.currentTimeMillis()).build());
                }
            }.start();
            return null;
        }
    }

    @ProtocolInfo(protocolName="org.apache.hadoop.ipc.TestProtoBufRpcServerHandoff$TestProtoBufRpcServerHandoffProtocol", protocolVersion=1L)
    public static interface TestProtoBufRpcServerHandoffProtocol
    extends TestRpcServiceProtos.TestProtobufRpcHandoffProto.BlockingInterface {
    }

    private static class ClientInvocationCallable
    implements Callable<ClientInvocationCallable> {
        final TestProtoBufRpcServerHandoffProtocol client;
        final long sleepTime;
        TestProtos.SleepResponseProto2 result;
        long startTime;
        long endTime;

        private ClientInvocationCallable(TestProtoBufRpcServerHandoffProtocol client, long sleepTime) {
            this.client = client;
            this.sleepTime = sleepTime;
        }

        @Override
        public ClientInvocationCallable call() throws Exception {
            this.startTime = System.currentTimeMillis();
            this.result = this.client.sleep(null, TestProtos.SleepRequestProto2.newBuilder().setSleepTime(this.sleepTime).build());
            this.endTime = System.currentTimeMillis();
            return this;
        }

        public String toString() {
            return "startTime=" + this.startTime + ", endTime=" + this.endTime + (String)(this.result != null ? ", result.receiveTime=" + this.result.getReceiveTime() + ", result.responseTime=" + this.result.getResponseTime() : "");
        }
    }
}

