/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.org.apache.druid.java.util.emitter.core;

import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.primitives.Ints;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.BaseHttpEmittingConfig;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.Batch;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.BatchingStrategy;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.EmitterTest;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.Event;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.GoHandler;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.GoHandlers;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.HttpEmitterConfig;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.HttpPostEmitter;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.IntEvent;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.core.MockHttpClient;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.junit.Assert;
import org.junit.Test;

public class HttpPostEmitterStressTest {
    private static final int N = 10000;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(){

        public byte[] writeValueAsBytes(Object value) {
            return Ints.toByteArray((int)((IntEvent)value).index);
        }
    };
    private final MockHttpClient httpClient = new MockHttpClient();

    @Test
    public void eventCountBased() throws InterruptedException, IOException {
        int threadIndex;
        int i;
        HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar").setFlushMillis(100L).setFlushCount(4).setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS).setBatchingStrategy(BatchingStrategy.ONLY_EVENTS).setMaxBatchSize(0x100000).setBatchQueueSizeLimit(1000).build();
        final HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, OBJECT_MAPPER);
        int nThreads = Runtime.getRuntime().availableProcessors() * 2;
        final ArrayList<IntArrayList> eventsPerThread = new ArrayList<IntArrayList>(nThreads);
        final ArrayList eventBatchesPerThread = new ArrayList(nThreads);
        for (i = 0; i < nThreads; ++i) {
            eventsPerThread.add(new IntArrayList());
            eventBatchesPerThread.add(new ArrayList());
        }
        for (i = 0; i < 10000; ++i) {
            ((IntList)eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads))).add(i);
        }
        final BitSet emittedEvents = new BitSet(10000);
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected ListenableFuture<Response> go(Request request) {
                ByteBuffer batch = request.getByteBufferData().slice();
                while (batch.remaining() > 0) {
                    emittedEvents.set(batch.getInt());
                }
                return GoHandlers.immediateFuture(EmitterTest.okResponse());
            }
        });
        emitter.start();
        final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
        int i2 = 0;
        while (i2 < nThreads) {
            threadIndex = i2++;
            new Thread(){

                @Override
                public void run() {
                    IntList events = (IntList)eventsPerThread.get(threadIndex);
                    List eventBatches = (List)eventBatchesPerThread.get(threadIndex);
                    IntEvent event = new IntEvent();
                    int eventsSize = events.size();
                    for (int i = 0; i < eventsSize; ++i) {
                        event.index = events.getInt(i);
                        eventBatches.add(emitter.emitAndReturnBatch((Event)event));
                        if (i % 16 != 0) continue;
                        try {
                            Thread.sleep(10L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    threadsCompleted.countDown();
                }
            }.start();
        }
        threadsCompleted.await();
        emitter.flush();
        System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
        for (int eventIndex = 0; eventIndex < 10000; ++eventIndex) {
            if (emittedEvents.get(eventIndex)) continue;
            for (threadIndex = 0; threadIndex < eventsPerThread.size(); ++threadIndex) {
                IntList threadEvents = (IntList)eventsPerThread.get(threadIndex);
                int indexOfEvent = threadEvents.indexOf(eventIndex);
                if (indexOfEvent < 0) continue;
                Batch batch = (Batch)((List)eventBatchesPerThread.get(threadIndex)).get(indexOfEvent);
                System.err.println(batch);
                int bufferWatermark = batch.getSealedBufferWatermark();
                ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
                batchBuffer.limit(bufferWatermark);
                while (batchBuffer.remaining() > 0) {
                    System.err.println(batchBuffer.getInt());
                }
                break;
            }
            throw new AssertionError((Object)("event " + eventIndex));
        }
    }

    @Test
    public void testLargeEventsQueueLimit() throws IOException {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar").setFlushMillis(100L).setFlushCount(4).setBatchingStrategy(BatchingStrategy.ONLY_EVENTS).setMaxBatchSize(0x100000).setBatchQueueSizeLimit(10).build();
        HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, new ObjectMapper());
        emitter.start();
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X {
                return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
            }
        });
        char[] chars = new char[600000];
        Arrays.fill(chars, '*');
        String bigString = new String(chars);
        Event bigEvent = ServiceMetricEvent.builder().setFeed("bigEvents").setDimension("test", (Object)bigString).build("metric", (Number)10).build("qwerty", "asdfgh");
        for (int i = 0; i < 1000; ++i) {
            emitter.emit(bigEvent);
            Assert.assertTrue((emitter.getLargeEventsToEmit() <= 11L ? 1 : 0) != 0);
        }
        emitter.flush();
    }

    @Test
    public void testLargeAndSmallEventsQueueLimit() throws InterruptedException, IOException {
        HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar").setFlushMillis(100L).setFlushCount(4).setBatchingStrategy(BatchingStrategy.ONLY_EVENTS).setMaxBatchSize(0x100000).setBatchQueueSizeLimit(10).build();
        final HttpPostEmitter emitter = new HttpPostEmitter(config, (AsyncHttpClient)this.httpClient, new ObjectMapper());
        emitter.start();
        this.httpClient.setGoHandler(new GoHandler(){

            @Override
            protected <X extends Exception> ListenableFuture<Response> go(Request request) throws X {
                return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
            }
        });
        char[] chars = new char[600000];
        Arrays.fill(chars, '*');
        String bigString = new String(chars);
        final Event smallEvent = ServiceMetricEvent.builder().setFeed("smallEvents").setDimension("test", (Object)"hi").build("metric", (Number)10).build("qwerty", "asdfgh");
        final Event bigEvent = ServiceMetricEvent.builder().setFeed("bigEvents").setDimension("test", (Object)bigString).build("metric", (Number)10).build("qwerty", "asdfgh");
        final CountDownLatch threadsCompleted = new CountDownLatch(2);
        new Thread(){

            @Override
            public void run() {
                for (int i = 0; i < 1000; ++i) {
                    emitter.emit(smallEvent);
                    Assert.assertTrue((emitter.getTotalFailedBuffers() <= 10 ? 1 : 0) != 0);
                    Assert.assertTrue((emitter.getBuffersToEmit() <= 12 ? 1 : 0) != 0);
                }
                threadsCompleted.countDown();
            }
        }.start();
        new Thread(){

            @Override
            public void run() {
                for (int i = 0; i < 1000; ++i) {
                    emitter.emit(bigEvent);
                    Assert.assertTrue((emitter.getTotalFailedBuffers() <= 10 ? 1 : 0) != 0);
                    Assert.assertTrue((emitter.getBuffersToEmit() <= 12 ? 1 : 0) != 0);
                }
                threadsCompleted.countDown();
            }
        }.start();
        threadsCompleted.await();
        emitter.flush();
    }
}

