/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.ipc.netty;

import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.test.Mail;
import org.apache.avro.test.Message;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestNettyServer {
    private static final Logger LOG = LoggerFactory.getLogger((String)TestNettyServer.class.getName());
    static final int CONNECT_TIMEOUT_MILLIS = 2000;
    protected static Server server;
    protected static Transceiver transceiver;
    protected static Mail proxy;
    protected static MailImpl mailService;
    protected static Consumer<SocketChannel> channelInitializer;

    public static void initializeConnections(Consumer<SocketChannel> initializer) throws Exception {
        TestNettyServer.initializeConnections(initializer, initializer);
    }

    public static void initializeConnections(Consumer<SocketChannel> serverInitializer, Consumer<SocketChannel> transceiverInitializer) throws Exception {
        LOG.info("starting server...");
        channelInitializer = transceiverInitializer;
        mailService = new MailImpl();
        SpecificResponder responder = new SpecificResponder(Mail.class, (Object)mailService);
        server = new NettyServer((Responder)responder, new InetSocketAddress(0), serverInitializer);
        server.start();
        int serverPort = server.getPort();
        LOG.info("server port : {}", (Object)serverPort);
        transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), Integer.valueOf(2000), transceiverInitializer, null);
        proxy = (Mail)SpecificRequestor.getClient(Mail.class, (Transceiver)transceiver);
    }

    @BeforeAll
    public static void initializeConnections() throws Exception {
        TestNettyServer.initializeConnections(null);
    }

    @AfterAll
    public static void tearDownConnections() throws Exception {
        transceiver.close();
        server.close();
    }

    @Test
    void requestResponse() throws Exception {
        for (int x = 0; x < 5; ++x) {
            this.verifyResponse(proxy.send(this.createMessage()));
        }
    }

    private void verifyResponse(String result) {
        Assertions.assertEquals((Object)"Sent message to [wife] from [husband] with body [I love you!]", (Object)result);
    }

    @Test
    void oneway() throws Exception {
        for (int x = 0; x < 5; ++x) {
            proxy.fireandforget(this.createMessage());
        }
        mailService.awaitMessages();
        mailService.assertAllMessagesReceived();
    }

    @Test
    void mixtureOfRequests() throws Exception {
        mailService.reset();
        for (int x = 0; x < 5; ++x) {
            Message createMessage = this.createMessage();
            proxy.fireandforget(createMessage);
            this.verifyResponse(proxy.send(createMessage));
        }
        mailService.awaitMessages();
        mailService.assertAllMessagesReceived();
    }

    @Test
    void connectionsCount() throws Exception {
        TestNettyServer.assertNumberOfConnectionsOnServer(1, 1000L);
        NettyTransceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(server.getPort()), Integer.valueOf(2000), channelInitializer);
        Mail proxy2 = (Mail)SpecificRequestor.getClient(Mail.class, (Transceiver)transceiver2);
        proxy.fireandforget(this.createMessage());
        proxy2.fireandforget(this.createMessage());
        TestNettyServer.assertNumberOfConnectionsOnServer(2, 0L);
        transceiver2.close();
        TestNettyServer.assertNumberOfConnectionsOnServer(1, 5000L);
    }

    private static void assertNumberOfConnectionsOnServer(int wantedNumberOfConnections, long maxWaitMs) throws InterruptedException {
        int numActiveConnections = ((NettyServer)server).getNumActiveConnections();
        if (numActiveConnections == wantedNumberOfConnections) {
            return;
        }
        long startMs = System.currentTimeMillis();
        long waited = 0L;
        if (maxWaitMs > 0L) {
            boolean timeOut = false;
            while (numActiveConnections != wantedNumberOfConnections && !timeOut) {
                LOG.info("Server still has {} active connections (want {}, waiting for {}ms); retrying...", new Object[]{numActiveConnections, wantedNumberOfConnections, waited});
                Thread.sleep(100L);
                numActiveConnections = ((NettyServer)server).getNumActiveConnections();
                waited = System.currentTimeMillis() - startMs;
                timeOut = waited > maxWaitMs;
            }
            LOG.info("Server has {} active connections", (Object)numActiveConnections);
        }
        Assertions.assertEquals((int)wantedNumberOfConnections, (int)numActiveConnections, (String)("Not the expected number of connections after a wait of " + waited + " ms"));
    }

    private Message createMessage() {
        Message msg = Message.newBuilder().setTo("wife").setFrom("husband").setBody("I love you!").build();
        return msg;
    }

    @Test
    void badRequest() throws IOException {
        int port = server.getPort();
        String msg = "GET /status HTTP/1.1\n\n";
        InetSocketAddress sockAddr = new InetSocketAddress("127.0.0.1", port);
        try (Socket sock = new Socket();){
            sock.connect(sockAddr);
            OutputStream out = sock.getOutputStream();
            out.write(msg.getBytes(StandardCharsets.UTF_8));
            out.flush();
            byte[] buf = new byte[2048];
            int bytesRead = sock.getInputStream().read(buf);
            Assertions.assertEquals((int)bytesRead, (int)-1);
        }
    }

    public static class MailImpl
    implements Mail {
        private CountDownLatch allMessages = new CountDownLatch(5);

        public String send(Message message) {
            return "Sent message to [" + message.getTo() + "] from [" + message.getFrom() + "] with body [" + message.getBody() + "]";
        }

        public void fireandforget(Message message) {
            this.allMessages.countDown();
        }

        private void awaitMessages() throws InterruptedException {
            this.allMessages.await(2L, TimeUnit.SECONDS);
        }

        private void assertAllMessagesReceived() {
            Assertions.assertEquals((long)0L, (long)this.allMessages.getCount());
        }

        public void reset() {
            this.allMessages = new CountDownLatch(5);
        }
    }
}

