/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.metrics2.impl;

import java.util.ConcurrentModificationException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.metrics2.impl.SinkQueue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSinkQueue {
    private static final Logger LOG = LoggerFactory.getLogger(TestSinkQueue.class);

    @Test
    public void testCommon() throws Exception {
        SinkQueue q = new SinkQueue(2);
        q.enqueue((Object)1);
        Assertions.assertEquals((int)1, (int)((Integer)q.front()), (String)"queue front");
        Assertions.assertEquals((int)1, (int)((Integer)q.back()), (String)"queue back");
        Assertions.assertEquals((int)1, (int)((Integer)q.dequeue()), (String)"element");
        Assertions.assertTrue((boolean)q.enqueue((Object)2), (String)"should enqueue");
        q.consume((SinkQueue.Consumer)new SinkQueue.Consumer<Integer>(){

            public void consume(Integer e) {
                Assertions.assertEquals((int)2, (int)e, (String)"element");
            }
        });
        Assertions.assertTrue((boolean)q.enqueue((Object)3), (String)"should enqueue");
        Assertions.assertEquals((int)3, (int)((Integer)q.dequeue()), (String)"element");
        Assertions.assertEquals((int)0, (int)q.size(), (String)"queue size");
        Assertions.assertEquals(null, (Integer)((Integer)q.front()), (String)"queue front");
        Assertions.assertEquals(null, (Integer)((Integer)q.back()), (String)"queue back");
    }

    @Test
    public void testEmptyBlocking() throws Exception {
        this.testEmptyBlocking(0);
        this.testEmptyBlocking(100);
    }

    private void testEmptyBlocking(int awhile) throws Exception {
        final SinkQueue q = new SinkQueue(2);
        final Runnable trigger = (Runnable)Mockito.mock(Runnable.class);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    Assertions.assertEquals((int)1, (int)((Integer)q.dequeue()), (String)"element");
                    q.consume((SinkQueue.Consumer)new SinkQueue.Consumer<Integer>(){

                        public void consume(Integer e) {
                            Assertions.assertEquals((int)2, (int)e, (String)"element");
                            trigger.run();
                        }
                    });
                }
                catch (InterruptedException e) {
                    LOG.warn("Interrupted", (Throwable)e);
                }
            }
        };
        t.start();
        if (awhile > 0) {
            Thread.sleep(awhile);
        }
        q.enqueue((Object)1);
        q.enqueue((Object)2);
        t.join();
        ((Runnable)Mockito.verify((Object)trigger)).run();
    }

    @Test
    public void testFull() throws Exception {
        SinkQueue q = new SinkQueue(1);
        q.enqueue((Object)1);
        Assertions.assertTrue((!q.enqueue((Object)2) ? 1 : 0) != 0, (String)"should drop");
        Assertions.assertEquals((int)1, (int)((Integer)q.dequeue()), (String)"element");
        q.enqueue((Object)3);
        q.consume((SinkQueue.Consumer)new SinkQueue.Consumer<Integer>(){

            public void consume(Integer e) {
                Assertions.assertEquals((int)3, (int)e, (String)"element");
            }
        });
        Assertions.assertEquals((int)0, (int)q.size(), (String)"queue size");
    }

    @Test
    public void testConsumeAll() throws Exception {
        int capacity = 64;
        SinkQueue q = new SinkQueue(64);
        for (int i = 0; i < 64; ++i) {
            Assertions.assertTrue((boolean)q.enqueue((Object)i), (String)"should enqueue");
        }
        Assertions.assertTrue((!q.enqueue((Object)64) ? 1 : 0) != 0, (String)"should not enqueue");
        final Runnable trigger = (Runnable)Mockito.mock(Runnable.class);
        q.consumeAll((SinkQueue.Consumer)new SinkQueue.Consumer<Integer>(){
            private int expected = 0;

            public void consume(Integer e) {
                Assertions.assertEquals((int)this.expected++, (int)e, (String)"element");
                trigger.run();
            }
        });
        ((Runnable)Mockito.verify((Object)trigger, (VerificationMode)Mockito.times((int)64))).run();
    }

    @Test
    public void testConsumerException() throws Exception {
        SinkQueue q = new SinkQueue(1);
        final RuntimeException ex = new RuntimeException("expected");
        q.enqueue((Object)1);
        try {
            q.consume((SinkQueue.Consumer)new SinkQueue.Consumer<Integer>(){

                public void consume(Integer e) {
                    throw ex;
                }
            });
        }
        catch (Exception expected) {
            Assertions.assertSame((Object)ex, (Object)expected, (String)"consumer exception");
        }
        Assertions.assertEquals((int)1, (int)q.size(), (String)"queue size");
        Assertions.assertEquals((int)1, (int)((Integer)q.dequeue()), (String)"element");
    }

    @Test
    public void testClear() {
        SinkQueue q = new SinkQueue(128);
        for (int i = 0; i < q.capacity() + 97; ++i) {
            q.enqueue((Object)i);
        }
        Assertions.assertEquals((int)q.capacity(), (int)q.size(), (String)"queue size");
        q.clear();
        Assertions.assertEquals((int)0, (int)q.size(), (String)"queue size");
    }

    @Test
    public void testHangingConsumer() throws Exception {
        SinkQueue<Integer> q = this.newSleepingConsumerQueue(2, 1, 2);
        Assertions.assertEquals((int)2, (int)((Integer)q.back()), (String)"queue back");
        Assertions.assertTrue((!q.enqueue((Object)3) ? 1 : 0) != 0, (String)"should drop");
        Assertions.assertEquals((int)2, (int)q.size(), (String)"queue size");
        Assertions.assertEquals((int)1, (int)((Integer)q.front()), (String)"queue head");
        Assertions.assertEquals((int)2, (int)((Integer)q.back()), (String)"queue back");
    }

    @Test
    public void testConcurrentConsumers() throws Exception {
        final SinkQueue<Integer> q = this.newSleepingConsumerQueue(2, 1);
        Assertions.assertTrue((boolean)q.enqueue((Object)2), (String)"should enqueue");
        Assertions.assertEquals((int)2, (int)((Integer)q.back()), (String)"queue back");
        Assertions.assertTrue((!q.enqueue((Object)3) ? 1 : 0) != 0, (String)"should drop");
        this.shouldThrowCME(new Fun(){

            @Override
            public void run() {
                q.clear();
            }
        });
        this.shouldThrowCME(new Fun(){

            @Override
            public void run() throws Exception {
                q.consume(null);
            }
        });
        this.shouldThrowCME(new Fun(){

            @Override
            public void run() throws Exception {
                q.consumeAll(null);
            }
        });
        this.shouldThrowCME(new Fun(){

            @Override
            public void run() throws Exception {
                q.dequeue();
            }
        });
        Assertions.assertEquals((int)2, (int)q.size(), (String)"queue size");
        Assertions.assertEquals((int)1, (int)((Integer)q.front()), (String)"queue front");
        Assertions.assertEquals((int)2, (int)((Integer)q.back()), (String)"queue back");
    }

    private void shouldThrowCME(Fun callback) throws Exception {
        try {
            callback.run();
        }
        catch (ConcurrentModificationException e) {
            LOG.info(e.toString());
            return;
        }
        LOG.error("should've thrown CME");
        Assertions.fail((String)"should've thrown CME");
    }

    private SinkQueue<Integer> newSleepingConsumerQueue(int capacity, int ... values) throws Exception {
        final SinkQueue q = new SinkQueue(capacity);
        for (int i : values) {
            q.enqueue((Object)i);
        }
        final CountDownLatch barrier = new CountDownLatch(1);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    Thread.sleep(10L);
                    q.consume((SinkQueue.Consumer)new SinkQueue.Consumer<Integer>(){

                        public void consume(Integer e) throws InterruptedException {
                            LOG.info("sleeping");
                            barrier.countDown();
                            Thread.sleep(86400000L);
                        }
                    });
                }
                catch (InterruptedException ex) {
                    LOG.warn("Interrupted", (Throwable)ex);
                }
            }
        };
        t.setName("Sleeping consumer");
        t.setDaemon(true);
        t.start();
        barrier.await();
        LOG.debug("Returning new sleeping consumer queue");
        return q;
    }

    static interface Fun {
        public void run() throws Exception;
    }
}

