/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.ipc.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.test.Kind;
import org.apache.avro.test.MD5;
import org.apache.avro.test.Simple;
import org.apache.avro.test.TestError;
import org.apache.avro.test.TestRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestNettyServerWithCallbacks {
    private static Server server;
    private static Transceiver transceiver;
    private static Simple.Callback simpleClient;
    private static final AtomicBoolean ackFlag;
    private static final AtomicReference<CountDownLatch> ackLatch;
    private static Simple simpleService;

    @BeforeAll
    public static void initializeConnections() throws Exception {
        SpecificResponder responder = new SpecificResponder(Simple.class, (Object)simpleService);
        server = new NettyServer((Responder)responder, new InetSocketAddress(0));
        server.start();
        int serverPort = server.getPort();
        System.out.println("server port : " + serverPort);
        transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), Integer.valueOf(2000));
        simpleClient = (Simple.Callback)SpecificRequestor.getClient(Simple.Callback.class, (Transceiver)transceiver);
    }

    @AfterAll
    public static void tearDownConnections() throws Exception {
        if (transceiver != null) {
            transceiver.close();
        }
        if (server != null) {
            server.close();
        }
    }

    @Test
    void greeting() throws Exception {
        Assertions.assertEquals((Object)"Hello, how are you?", (Object)simpleClient.hello("how are you?"));
        CallFuture future1 = new CallFuture();
        simpleClient.hello("World!", (Callback)future1);
        Assertions.assertEquals((Object)"Hello, World!", (Object)future1.get(2L, TimeUnit.SECONDS));
        Assertions.assertNull((Object)future1.getError());
        final CallFuture future2 = new CallFuture();
        simpleClient.hello("what's up?", (Callback)new Callback<String>(){

            public void handleResult(String result) {
                future2.handleResult((Object)result);
            }

            public void handleError(Throwable error) {
                future2.handleError(error);
            }
        });
        Assertions.assertEquals((Object)"Hello, what's up?", (Object)future2.get(2L, TimeUnit.SECONDS));
        Assertions.assertNull((Object)future2.getError());
    }

    @Test
    void echo() throws Exception {
        TestRecord record = TestRecord.newBuilder().setHash(new MD5(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8})).setKind(Kind.FOO).setName("My Record").build();
        Assertions.assertEquals((Object)record, (Object)simpleClient.echo(record));
        CallFuture future1 = new CallFuture();
        simpleClient.echo(record, (Callback)future1);
        Assertions.assertEquals((Object)record, (Object)future1.get(2L, TimeUnit.SECONDS));
        Assertions.assertNull((Object)future1.getError());
        final CallFuture future2 = new CallFuture();
        simpleClient.echo(record, (Callback)new Callback<TestRecord>(){

            public void handleResult(TestRecord result) {
                future2.handleResult((Object)result);
            }

            public void handleError(Throwable error) {
                future2.handleError(error);
            }
        });
        Assertions.assertEquals((Object)record, (Object)future2.get(2L, TimeUnit.SECONDS));
        Assertions.assertNull((Object)future2.getError());
    }

    @Test
    void add() throws Exception {
        Assertions.assertEquals((int)8, (int)simpleClient.add(2, 6));
        CallFuture future1 = new CallFuture();
        simpleClient.add(8, 8, (Callback)future1);
        Assertions.assertEquals((Integer)16, (Integer)((Integer)future1.get(2L, TimeUnit.SECONDS)));
        Assertions.assertNull((Object)future1.getError());
        final CallFuture future2 = new CallFuture();
        simpleClient.add(512, 256, (Callback)new Callback<Integer>(){

            public void handleResult(Integer result) {
                future2.handleResult((Object)result);
            }

            public void handleError(Throwable error) {
                future2.handleError(error);
            }
        });
        Assertions.assertEquals((Integer)768, (Integer)((Integer)future2.get(2L, TimeUnit.SECONDS)));
        Assertions.assertNull((Object)future2.getError());
    }

    @Test
    void echoBytes() throws Exception {
        ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
        Assertions.assertEquals((Object)byteBuffer, (Object)simpleClient.echoBytes(byteBuffer));
        CallFuture future1 = new CallFuture();
        simpleClient.echoBytes(byteBuffer, (Callback)future1);
        Assertions.assertEquals((Object)byteBuffer, (Object)future1.get(2L, TimeUnit.SECONDS));
        Assertions.assertNull((Object)future1.getError());
        final CallFuture future2 = new CallFuture();
        simpleClient.echoBytes(byteBuffer, (Callback)new Callback<ByteBuffer>(){

            public void handleResult(ByteBuffer result) {
                future2.handleResult((Object)result);
            }

            public void handleError(Throwable error) {
                future2.handleError(error);
            }
        });
        Assertions.assertEquals((Object)byteBuffer, (Object)future2.get(2L, TimeUnit.SECONDS));
        Assertions.assertNull((Object)future2.getError());
    }

    @Test
    void error() throws IOException, InterruptedException, TimeoutException {
        try {
            simpleClient.error();
            Assertions.fail((String)("Expected " + TestError.class.getCanonicalName()));
        }
        catch (TestError testError) {
            // empty catch block
        }
        CallFuture future = new CallFuture();
        simpleClient.error((Callback)future);
        try {
            future.get(2L, TimeUnit.SECONDS);
            Assertions.fail((String)("Expected " + TestError.class.getCanonicalName() + " to be thrown"));
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TestError), (String)("Expected " + TestError.class.getCanonicalName()));
        }
        Assertions.assertNotNull((Object)future.getError());
        Assertions.assertTrue((boolean)(future.getError() instanceof TestError), (String)("Expected " + TestError.class.getCanonicalName()));
        Assertions.assertNull((Object)future.getResult());
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference errorRef = new AtomicReference();
        simpleClient.error((Callback)new Callback<Void>(){

            public void handleResult(Void result) {
                Assertions.fail((String)("Expected " + TestError.class.getCanonicalName()));
            }

            public void handleError(Throwable error) {
                errorRef.set(error);
                latch.countDown();
            }
        });
        Assertions.assertTrue((boolean)latch.await(2L, TimeUnit.SECONDS), (String)"Timed out waiting for error");
        Assertions.assertNotNull(errorRef.get());
        Assertions.assertTrue((boolean)(errorRef.get() instanceof TestError));
    }

    @Test
    void ack() throws Exception {
        simpleClient.ack();
        ackLatch.get().await(2L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)ackFlag.get(), (String)"Expected ack flag to be set");
        ackLatch.set(new CountDownLatch(1));
        simpleClient.ack();
        ackLatch.get().await(2L, TimeUnit.SECONDS);
        Assertions.assertFalse((boolean)ackFlag.get(), (String)"Expected ack flag to be cleared");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void sendAfterChannelClose() throws Exception {
        server2.start();
        try (NettyServer server2 = new NettyServer((Responder)new SpecificResponder(Simple.class, (Object)simpleService), new InetSocketAddress(0));){
            int serverPort = server2.getPort();
            System.out.println("server2 port : " + serverPort);
            try (NettyTransceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(serverPort), Integer.valueOf(2000));){
                Simple.Callback simpleClient2 = (Simple.Callback)SpecificRequestor.getClient(Simple.Callback.class, (Transceiver)transceiver2);
                Assertions.assertEquals((int)3, (int)simpleClient2.add(1, 2));
                CallFuture addFuture = new CallFuture();
                simpleClient2.add(1, 2, (Callback)addFuture);
                Assertions.assertEquals((Integer)3, (Integer)((Integer)addFuture.get()));
                server2.close();
                Thread.sleep(1000L);
                boolean ioeCaught = false;
                try {
                    simpleClient2.add(1, 2);
                    Assertions.fail((String)"Send after server close should have thrown Exception");
                }
                catch (AvroRuntimeException e) {
                    ioeCaught = e.getCause() instanceof IOException;
                    Assertions.assertTrue((boolean)ioeCaught, (String)"Expected IOException");
                }
                catch (Exception e) {
                    e.printStackTrace();
                    throw e;
                }
                Assertions.assertTrue((boolean)ioeCaught, (String)"Expected IOException");
                ioeCaught = false;
                try {
                    addFuture = new CallFuture();
                    simpleClient2.add(1, 2, (Callback)addFuture);
                    addFuture.get();
                    Assertions.fail((String)"Send after server close should have thrown Exception");
                }
                catch (IOException e) {
                    ioeCaught = true;
                }
                catch (Exception e) {
                    e.printStackTrace();
                    throw e;
                }
                Assertions.assertTrue((boolean)ioeCaught, (String)"Expected IOException");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void cancelPendingRequestsOnTransceiverClose() throws Exception {
        BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl();
        NettyServer server2 = new NettyServer((Responder)new SpecificResponder(Simple.class, (Object)blockingSimpleImpl), new InetSocketAddress(0));
        server2.start();
        try {
            int serverPort = server2.getPort();
            System.out.println("server2 port : " + serverPort);
            CallFuture addFuture = new CallFuture();
            try (NettyTransceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(serverPort), Integer.valueOf(2000));){
                Simple.Callback simpleClient2 = (Simple.Callback)SpecificRequestor.getClient(Simple.Callback.class, (Transceiver)transceiver2);
                Assertions.assertEquals((int)3, (int)simpleClient2.add(1, 2));
                blockingSimpleImpl.acquireRunPermit();
                simpleClient2.add(1, 2, (Callback)addFuture);
            }
            boolean ioeThrown = false;
            try {
                addFuture.get();
            }
            catch (ExecutionException e) {
                ioeThrown = e.getCause() instanceof IOException;
                Assertions.assertTrue((boolean)(e.getCause() instanceof IOException));
            }
            catch (Exception e) {
                e.printStackTrace();
                Assertions.fail((String)("Unexpected Exception: " + e.toString()));
            }
            Assertions.assertTrue((boolean)ioeThrown, (String)"Expected IOException to be thrown");
        }
        finally {
            blockingSimpleImpl.releaseRunPermit();
            server2.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=20000L)
    void cancelPendingRequestsAfterChannelCloseByServerShutdown() throws Throwable {
        BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl();
        NettyServer server2 = new NettyServer((Responder)new SpecificResponder(Simple.class, (Object)blockingSimpleImpl), new InetSocketAddress(0));
        server2.start();
        NettyTransceiver transceiver2 = null;
        try {
            int serverPort = server2.getPort();
            System.out.println("server2 port : " + serverPort);
            transceiver2 = new NettyTransceiver(new InetSocketAddress(serverPort), Integer.valueOf(2000));
            Simple.Callback simpleClient2 = (Simple.Callback)SpecificRequestor.getClient(Simple.Callback.class, (Transceiver)transceiver2);
            blockingSimpleImpl.acquireEnterPermit();
            blockingSimpleImpl.acquireRunPermit();
            Future<?> clientFuture = Executors.newSingleThreadExecutor().submit(() -> {
                try {
                    simpleClient2.add(3, 4);
                    Assertions.fail((String)"Expected an exception");
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
            blockingSimpleImpl.acquireEnterPermit();
            new Thread(() -> ((Server)server2).close()).start();
            try {
                clientFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
            catch (TimeoutException e) {
                Assertions.fail((String)"Client request should not be blocked on server shutdown");
            }
        }
        finally {
            blockingSimpleImpl.releaseRunPermit();
            server2.close();
            if (transceiver2 != null) {
                transceiver2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void clientReconnectAfterServerRestart() throws Exception {
        BlockingSimpleImpl simpleImpl = new BlockingSimpleImpl();
        try (NettyServer server2 = new NettyServer((Responder)new SpecificResponder(Simple.class, (Object)simpleImpl), new InetSocketAddress(0));){
            server2.start();
            int serverPort = server2.getPort();
            System.out.println("server2 port : " + serverPort);
            NettyTransceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(serverPort), Integer.valueOf(2000));
            Simple.Callback simpleClient2 = (Simple.Callback)SpecificRequestor.getClient(Simple.Callback.class, (Transceiver)transceiver2);
            Assertions.assertEquals((int)3, (int)simpleClient2.add(1, 2));
            server2.close();
            try {
                simpleClient2.add(2, -1);
                Assertions.fail((String)"Client should not be able to invoke RPCs because server is no longer running");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Thread.sleep(2000L);
            server2 = new NettyServer((Responder)new SpecificResponder(Simple.class, (Object)simpleImpl), new InetSocketAddress(serverPort));
            server2.start();
            Assertions.assertEquals((int)3, (int)simpleClient2.add(1, 2));
        }
    }

    @Disabled
    @Test
    void performanceTest() throws Exception {
        int threadCount = 8;
        long runTimeMillis = 10000L;
        ExecutorService threadPool = Executors.newFixedThreadPool(8);
        System.out.println("Running performance test for 10000ms...");
        AtomicLong rpcCount = new AtomicLong(0L);
        AtomicBoolean runFlag = new AtomicBoolean(true);
        CountDownLatch startLatch = new CountDownLatch(8);
        for (int ii = 0; ii < 8; ++ii) {
            threadPool.submit(() -> {
                try {
                    startLatch.countDown();
                    startLatch.await(2L, TimeUnit.SECONDS);
                    while (runFlag.get()) {
                        rpcCount.incrementAndGet();
                        Assertions.assertEquals((Object)"Hello, World!", (Object)simpleClient.hello("World!"));
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        startLatch.await(2L, TimeUnit.SECONDS);
        Thread.sleep(10000L);
        runFlag.set(false);
        threadPool.shutdown();
        Assertions.assertTrue((boolean)threadPool.awaitTermination(2L, TimeUnit.SECONDS), (String)"Timed out shutting down thread pool");
        System.out.println("Completed " + rpcCount.get() + " RPCs in 10000ms => " + (double)rpcCount.get() / 10000.0 * 1000.0 + " RPCs/sec, " + 10000.0 / (double)rpcCount.get() + " ms/RPC.");
    }

    static {
        ackFlag = new AtomicBoolean(false);
        ackLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
        simpleService = new SimpleImpl(ackFlag);
    }

    private static class BlockingSimpleImpl
    extends SimpleImpl {
        private final Semaphore enterSemaphore = new Semaphore(1);
        private final Semaphore runSemaphore = new Semaphore(1);

        public BlockingSimpleImpl() {
            super(new AtomicBoolean());
        }

        @Override
        public String hello(String greeting) {
            this.releaseEnterPermit();
            this.acquireRunPermit();
            try {
                String string = super.hello(greeting);
                return string;
            }
            finally {
                this.releaseRunPermit();
            }
        }

        @Override
        public TestRecord echo(TestRecord record) {
            this.releaseEnterPermit();
            this.acquireRunPermit();
            try {
                TestRecord testRecord = super.echo(record);
                return testRecord;
            }
            finally {
                this.releaseRunPermit();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int add(int arg1, int arg2) {
            this.releaseEnterPermit();
            this.acquireRunPermit();
            try {
                int n = super.add(arg1, arg2);
                return n;
            }
            finally {
                this.releaseRunPermit();
            }
        }

        @Override
        public ByteBuffer echoBytes(ByteBuffer data) {
            this.releaseEnterPermit();
            this.acquireRunPermit();
            try {
                ByteBuffer byteBuffer = super.echoBytes(data);
                return byteBuffer;
            }
            finally {
                this.releaseRunPermit();
            }
        }

        @Override
        public void error() throws TestError {
            this.releaseEnterPermit();
            this.acquireRunPermit();
            try {
                super.error();
            }
            finally {
                this.releaseRunPermit();
            }
        }

        @Override
        public void ack() {
            this.releaseEnterPermit();
            this.acquireRunPermit();
            try {
                super.ack();
            }
            finally {
                this.releaseRunPermit();
            }
        }

        public void acquireRunPermit() {
            try {
                this.runSemaphore.acquire();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public void releaseRunPermit() {
            this.runSemaphore.release();
        }

        public void acquireEnterPermit() {
            try {
                this.enterSemaphore.acquire();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public void releaseEnterPermit() {
            this.enterSemaphore.release();
        }
    }

    private static class SimpleImpl
    implements Simple {
        private final AtomicBoolean ackFlag;

        public SimpleImpl(AtomicBoolean ackFlag) {
            this.ackFlag = ackFlag;
        }

        public String hello(String greeting) {
            return "Hello, " + greeting;
        }

        public TestRecord echo(TestRecord record) {
            return record;
        }

        public int add(int arg1, int arg2) {
            return arg1 + arg2;
        }

        public ByteBuffer echoBytes(ByteBuffer data) {
            return data;
        }

        public void error() throws TestError {
            throw TestError.newBuilder().setMessage$("Test Message").build();
        }

        public synchronized void ack() {
            this.ackFlag.set(!this.ackFlag.get());
            ackLatch.get().countDown();
        }
    }
}

