/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.jetty.websocket.common.message;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.common.message.EmptySession;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={WorkDirExtension.class})
public class MessageInputStreamTest {
    public WorkDir testdir;
    public ByteBufferPool bufferPool = new MappedByteBufferPool();

    @Test
    public void testBasicAppendRead() throws IOException {
        StreamTestSession session = new StreamTestSession();
        MessageInputStream stream = new MessageInputStream((Session)session);
        session.setMessageInputStream(stream);
        ByteBuffer payload = BufferUtil.toBuffer((String)"Hello World!", (Charset)StandardCharsets.UTF_8);
        session.addContent(payload, true);
        session.provideContent();
        byte[] buf = new byte[32];
        int len = stream.read(buf);
        String message = new String(buf, 0, len, StandardCharsets.UTF_8);
        MatcherAssert.assertThat((String)"Message", (Object)message, (Matcher)Matchers.is((Object)"Hello World!"));
    }

    @Test
    public void testBlockOnRead() throws Exception {
        StreamTestSession session = new StreamTestSession();
        MessageInputStream stream = new MessageInputStream((Session)session);
        session.setMessageInputStream(stream);
        new Thread(session::provideContent).start();
        AtomicBoolean hadError = new AtomicBoolean(false);
        CountDownLatch startLatch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                startLatch.countDown();
                TimeUnit.MILLISECONDS.sleep(200L);
                session.addContent("Saved", false);
                TimeUnit.MILLISECONDS.sleep(200L);
                session.addContent(" by ", false);
                TimeUnit.MILLISECONDS.sleep(200L);
                session.addContent("Zero", false);
                TimeUnit.MILLISECONDS.sleep(200L);
                session.addContent("", true);
            }
            catch (Throwable t) {
                hadError.set(true);
                t.printStackTrace(System.err);
            }
        }).start();
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(5L), () -> {
            startLatch.await();
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            IO.copy((InputStream)stream, (OutputStream)out);
            byte[] bytes = out.toByteArray();
            String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
            MatcherAssert.assertThat((String)"Error when appending", (Object)hadError.get(), (Matcher)Matchers.is((Object)false));
            MatcherAssert.assertThat((String)"Message", (Object)message, (Matcher)Matchers.is((Object)"Saved by Zero"));
        });
    }

    @Test
    public void testBlockOnReadInitial() throws IOException {
        StreamTestSession session = new StreamTestSession();
        MessageInputStream stream = new MessageInputStream((Session)session);
        session.setMessageInputStream(stream);
        session.addContent("I will conquer", true);
        AtomicReference error = new AtomicReference();
        new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(1000L);
                session.provideContent();
            }
            catch (Throwable t) {
                error.set(t);
                t.printStackTrace(System.err);
            }
        }).start();
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(10L), () -> {
            int b = stream.read();
            MatcherAssert.assertThat((String)"Initial byte", (Object)b, (Matcher)Matchers.is((Object)73));
            Assertions.assertNull(error.get());
        });
    }

    @Test
    public void testReadByteNoBuffersClosed() throws IOException {
        try (MessageInputStream stream = new MessageInputStream((Session)new EmptySession());){
            AtomicBoolean hadError = new AtomicBoolean(false);
            new Thread(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000L);
                    stream.appendFrame(null, true);
                    stream.messageComplete();
                }
                catch (IOException | InterruptedException e) {
                    hadError.set(true);
                    e.printStackTrace(System.err);
                }
            }).start();
            Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(10L), () -> {
                int b = stream.read();
                MatcherAssert.assertThat((String)"Initial byte", (Object)b, (Matcher)Matchers.is((Object)-1));
                MatcherAssert.assertThat((String)"Error when appending", (Object)hadError.get(), (Matcher)Matchers.is((Object)false));
            });
        }
    }

    @Test
    public void testSplitMessageWithEmptyPayloads() throws IOException {
        StreamTestSession session = new StreamTestSession();
        MessageInputStream stream = new MessageInputStream((Session)session);
        session.setMessageInputStream(stream);
        session.addContent("", false);
        session.addContent("Hello", false);
        session.addContent("", false);
        session.addContent(" World", false);
        session.addContent("!", false);
        session.addContent("", true);
        session.provideContent();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        IO.copy((InputStream)stream, (OutputStream)out);
        byte[] bytes = out.toByteArray();
        String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
        MatcherAssert.assertThat((String)"Message", (Object)message, (Matcher)Matchers.is((Object)"Hello World!"));
    }

    @Test
    public void testReadBeforeFirstAppend() throws IOException {
        StreamTestSession session = new StreamTestSession();
        MessageInputStream stream = new MessageInputStream((Session)session);
        session.setMessageInputStream(stream);
        session.addContent(BufferUtil.EMPTY_BUFFER, false);
        session.addContent("Hello World", true);
        new Thread(() -> {
            try {
                Thread.sleep(2000L);
                session.provideContent();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();
        byte[] buf = new byte[32];
        int len = stream.read(buf);
        String message = new String(buf, 0, len, StandardCharsets.UTF_8);
        MatcherAssert.assertThat((String)"Message", (Object)message, (Matcher)Matchers.is((Object)"Hello World"));
    }

    public static class StreamTestSession
    extends EmptySession {
        private static final ByteBuffer EOF = BufferUtil.allocate((int)0);
        private final AtomicBoolean suspended = new AtomicBoolean(false);
        private BlockingArrayQueue<ByteBuffer> contentQueue = new BlockingArrayQueue();
        private MessageInputStream stream;

        public void setMessageInputStream(MessageInputStream stream) {
            this.stream = stream;
        }

        public void addContent(String content, boolean last) {
            this.addContent(BufferUtil.toBuffer((String)content, (Charset)StandardCharsets.UTF_8), last);
        }

        public void addContent(ByteBuffer content, boolean last) {
            this.contentQueue.add((Object)content);
            if (last) {
                this.contentQueue.add((Object)EOF);
            }
        }

        public void provideContent() {
            this.pollAndAppendFrame();
        }

        @Override
        public void resume() {
            if (!this.suspended.compareAndSet(true, false)) {
                throw new IllegalStateException();
            }
            this.pollAndAppendFrame();
        }

        @Override
        public SuspendToken suspend() {
            if (!this.suspended.compareAndSet(false, true)) {
                throw new IllegalStateException();
            }
            return super.suspend();
        }

        private void pollAndAppendFrame() {
            try {
                do {
                    ByteBuffer content = (ByteBuffer)this.contentQueue.poll(10L, TimeUnit.SECONDS);
                    Assertions.assertNotNull((Object)content);
                    boolean eof = content == EOF;
                    this.stream.appendFrame(content, eof);
                    if (!eof) continue;
                    this.stream.messageComplete();
                    break;
                } while (!this.suspended.get());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

