/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.org.apache.druid.java.util.common.guava;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import org.apache.hive.druid.com.google.common.collect.Ordering;
import org.apache.hive.druid.org.apache.druid.common.guava.CombiningSequence;
import org.apache.hive.druid.org.apache.druid.java.util.common.Pair;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.Sequence;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.Sequences;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.Yielder;
import org.apache.hive.druid.org.apache.druid.java.util.common.guava.Yielders;
import org.apache.hive.druid.org.apache.druid.java.util.common.logger.Logger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class ParallelMergeCombiningSequenceTest {
    private static final int TEST_POOL_SIZE = 4;
    private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class);
    public static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> (Integer)p.lhs);
    public static final BinaryOperator<IntPair> INT_PAIR_MERGE_FN = (lhs, rhs) -> {
        if (lhs == null) {
            return rhs;
        }
        if (rhs == null) {
            return lhs;
        }
        return new IntPair((Integer)lhs.lhs, (Integer)lhs.rhs + (Integer)rhs.rhs);
    };
    private ForkJoinPool pool;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Before
    public void setup() {
        this.pool = new ForkJoinPool(4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", new Object[]{t}), true);
    }

    @After
    public void teardown() {
        this.pool.shutdown();
    }

    @Test
    public void testOrderedResultBatchFromSequence() throws IOException {
        Sequence<IntPair> rawSequence = ParallelMergeCombiningSequenceTest.nonBlockingSequence(5000);
        ParallelMergeCombiningSequence.YielderBatchedResultsCursor cursor = new ParallelMergeCombiningSequence.YielderBatchedResultsCursor(new ParallelMergeCombiningSequence.SequenceBatcher(rawSequence, 128), INT_PAIR_ORDERING);
        cursor.initialize();
        Yielder rawYielder = Yielders.each(rawSequence);
        IntPair prev = null;
        while (!rawYielder.isDone() && !cursor.isDone()) {
            Assert.assertEquals((Object)rawYielder.get(), (Object)cursor.get());
            Assert.assertNotEquals((Object)cursor.get(), prev);
            prev = (IntPair)((Object)cursor.get());
            rawYielder = rawYielder.next(rawYielder.get());
            cursor.advance();
        }
        cursor.close();
        rawYielder.close();
    }

    @Test
    public void testOrderedResultBatchFromSequenceBackToYielderOnSequence() throws IOException {
        int batchSize = 128;
        int sequenceSize = 5000;
        Sequence<IntPair> rawSequence = ParallelMergeCombiningSequenceTest.nonBlockingSequence(5000);
        ParallelMergeCombiningSequence.YielderBatchedResultsCursor cursor = new ParallelMergeCombiningSequence.YielderBatchedResultsCursor(new ParallelMergeCombiningSequence.SequenceBatcher(rawSequence, 128), INT_PAIR_ORDERING);
        cursor.initialize();
        Yielder rawYielder = Yielders.each(rawSequence);
        ArrayBlockingQueue<ParallelMergeCombiningSequence.ResultBatch> outputQueue = new ArrayBlockingQueue<ParallelMergeCombiningSequence.ResultBatch>((int)Math.ceil(41.0625));
        IntPair prev = null;
        ParallelMergeCombiningSequence.ResultBatch currentBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
        int batchCounter = 0;
        while (!rawYielder.isDone() && !cursor.isDone()) {
            Assert.assertEquals((Object)rawYielder.get(), (Object)cursor.get());
            Assert.assertNotEquals((Object)cursor.get(), prev);
            prev = (IntPair)((Object)cursor.get());
            currentBatch.add((Object)prev);
            if (++batchCounter >= 128) {
                outputQueue.offer(currentBatch);
                currentBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
                batchCounter = 0;
            }
            rawYielder = rawYielder.next(rawYielder.get());
            cursor.advance();
        }
        if (!currentBatch.isDrained()) {
            outputQueue.offer(currentBatch);
        }
        outputQueue.offer(ParallelMergeCombiningSequence.ResultBatch.TERMINAL);
        rawYielder.close();
        cursor.close();
        rawYielder = Yielders.each(rawSequence);
        Sequence queueAsSequence = ParallelMergeCombiningSequence.makeOutputSequenceForQueue(outputQueue, (boolean)true, (long)(System.nanoTime() + TimeUnit.NANOSECONDS.convert(10000L, TimeUnit.MILLISECONDS)), (ParallelMergeCombiningSequence.CancellationGizmo)new ParallelMergeCombiningSequence.CancellationGizmo());
        Yielder queueYielder = Yielders.each((Sequence)queueAsSequence);
        while (!rawYielder.isDone() && !queueYielder.isDone()) {
            Assert.assertEquals((Object)rawYielder.get(), (Object)queueYielder.get());
            Assert.assertNotEquals((Object)queueYielder.get(), (Object)((Object)prev));
            prev = (IntPair)((Object)queueYielder.get());
            rawYielder = rawYielder.next(rawYielder.get());
            queueYielder = queueYielder.next(queueYielder.get());
        }
        rawYielder.close();
        queueYielder.close();
    }

    @Test
    public void testOrderedResultBatchFromSequenceToBlockingQueueCursor() throws IOException {
        int batchSize = 128;
        int sequenceSize = 5000;
        Sequence<IntPair> rawSequence = ParallelMergeCombiningSequenceTest.nonBlockingSequence(5000);
        ParallelMergeCombiningSequence.YielderBatchedResultsCursor cursor = new ParallelMergeCombiningSequence.YielderBatchedResultsCursor(new ParallelMergeCombiningSequence.SequenceBatcher(rawSequence, 128), INT_PAIR_ORDERING);
        cursor.initialize();
        Yielder rawYielder = Yielders.each(rawSequence);
        ArrayBlockingQueue<ParallelMergeCombiningSequence.ResultBatch> outputQueue = new ArrayBlockingQueue<ParallelMergeCombiningSequence.ResultBatch>((int)Math.ceil(41.0625));
        IntPair prev = null;
        ParallelMergeCombiningSequence.ResultBatch currentBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
        int batchCounter = 0;
        while (!rawYielder.isDone() && !cursor.isDone()) {
            Assert.assertEquals((Object)rawYielder.get(), (Object)cursor.get());
            Assert.assertNotEquals((Object)cursor.get(), prev);
            prev = (IntPair)((Object)cursor.get());
            currentBatch.add((Object)prev);
            if (++batchCounter >= 128) {
                outputQueue.offer(currentBatch);
                currentBatch = new ParallelMergeCombiningSequence.ResultBatch(128);
                batchCounter = 0;
            }
            rawYielder = rawYielder.next(rawYielder.get());
            cursor.advance();
        }
        if (!currentBatch.isDrained()) {
            outputQueue.offer(currentBatch);
        }
        outputQueue.offer(ParallelMergeCombiningSequence.ResultBatch.TERMINAL);
        rawYielder.close();
        cursor.close();
        rawYielder = Yielders.each(rawSequence);
        ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor queueCursor = new ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor(outputQueue, INT_PAIR_ORDERING, false, -1L);
        queueCursor.initialize();
        prev = null;
        while (!rawYielder.isDone() && !queueCursor.isDone()) {
            Assert.assertEquals((Object)rawYielder.get(), (Object)queueCursor.get());
            Assert.assertNotEquals((Object)queueCursor.get(), (Object)((Object)prev));
            prev = (IntPair)((Object)queueCursor.get());
            rawYielder = rawYielder.next(rawYielder.get());
            queueCursor.advance();
        }
        rawYielder.close();
        queueCursor.close();
    }

    @Test
    public void testNone() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        this.assertResult(input);
    }

    @Test
    public void testEmpties() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(Sequences.empty());
        input.add(Sequences.empty());
        this.assertResult(input);
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add((Sequence<IntPair>)Sequences.empty());
        this.assertResult(input);
    }

    @Test
    public void testEmptiesAndNonEmpty() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(Sequences.empty());
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        this.assertResult(input);
        input.clear();
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add((Sequence<IntPair>)Sequences.empty());
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        this.assertResult(input);
    }

    @Test
    public void testAllInSingleBatch() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(6));
        this.assertResult(input, 10, 20, reportMetrics -> {
            Assert.assertEquals((long)1L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)2L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)11L, (long)reportMetrics.getInputRows());
            Assert.assertEquals((float)6.0f, (float)reportMetrics.getOutputRows(), (float)5.0f);
            Assert.assertEquals((long)4L, (long)reportMetrics.getTaskCount());
        });
        input.clear();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(6));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(8));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(4));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(6));
        this.assertResult(input, 10, 20, reportMetrics -> {
            Assert.assertEquals((long)2L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)6L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)34L, (long)reportMetrics.getInputRows());
            Assert.assertEquals((float)16.0f, (float)reportMetrics.getOutputRows(), (float)15.0f);
            Assert.assertEquals((float)10.0f, (float)reportMetrics.getTaskCount(), (float)2.0f);
        });
    }

    @Test
    public void testAllInSingleYield() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(6));
        this.assertResult(input, 4, 20, reportMetrics -> {
            Assert.assertEquals((long)1L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)2L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)11L, (long)reportMetrics.getInputRows());
            Assert.assertEquals((float)6.0f, (float)reportMetrics.getOutputRows(), (float)5.0f);
            Assert.assertEquals((long)4L, (long)reportMetrics.getTaskCount());
        });
        input.clear();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(6));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(8));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(4));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(6));
        this.assertResult(input, 4, 20, reportMetrics -> {
            Assert.assertEquals((long)2L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)6L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)34L, (long)reportMetrics.getInputRows());
            Assert.assertEquals((float)16.0f, (float)reportMetrics.getOutputRows(), (float)15.0f);
            Assert.assertEquals((float)10.0f, (float)reportMetrics.getTaskCount(), (float)2.0f);
        });
    }

    @Test
    public void testMultiBatchMultiYield() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(15));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(26));
        this.assertResult(input, 5, 10, reportMetrics -> {
            Assert.assertEquals((long)1L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)2L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)41L, (long)reportMetrics.getInputRows());
            Assert.assertEquals((float)21.0f, (float)reportMetrics.getOutputRows(), (float)20.0f);
            Assert.assertEquals((float)4.0f, (float)reportMetrics.getTaskCount(), (float)2.0f);
        });
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(15));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(33));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(17));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(14));
        this.assertResult(input, 5, 10, reportMetrics -> {
            Assert.assertEquals((long)2L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)6L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)120L, (long)reportMetrics.getInputRows());
            Assert.assertEquals((float)60.0f, (float)reportMetrics.getOutputRows(), (float)59.0f);
            Assert.assertEquals((float)10.0f, (float)reportMetrics.getTaskCount(), (float)5.0f);
        });
    }

    @Test
    public void testMixedSingleAndMultiYield() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(60));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(8));
        this.assertResult(input, 5, 10);
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(1));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(8));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(32));
        this.assertResult(input, 5, 10);
    }

    @Test
    public void testLongerSequencesJustForFun() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(10000));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(9001));
        this.assertResult(input, 128, 1024, reportMetrics -> {
            Assert.assertEquals((long)1L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)2L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)19001L, (long)reportMetrics.getInputRows());
        });
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(7777));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(8500));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5000));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(8888));
        this.assertResult(input, 128, 1024, reportMetrics -> {
            Assert.assertEquals((long)2L, (long)reportMetrics.getParallelism());
            Assert.assertEquals((long)6L, (long)reportMetrics.getInputSequences());
            Assert.assertEquals((long)49166L, (long)reportMetrics.getInputRows());
        });
    }

    @Test
    public void testExceptionOnInputSequenceRead() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.explodingSequence(15));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(25));
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectMessage("exploded");
        this.assertException(input);
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(5));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(25));
        input.add(ParallelMergeCombiningSequenceTest.explodingSequence(11));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(12));
        this.assertException(input);
    }

    @Test
    public void testExceptionFirstResultFromSequence() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.explodingSequence(0));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2));
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectMessage("exploded");
        this.assertException(input);
    }

    @Test
    public void testExceptionFirstResultFromMultipleSequence() throws Exception {
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.explodingSequence(0));
        input.add(ParallelMergeCombiningSequenceTest.explodingSequence(0));
        input.add(ParallelMergeCombiningSequenceTest.explodingSequence(0));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2));
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectMessage("exploded");
        this.assertException(input);
    }

    @Test
    public void testTimeoutExceptionDueToStalledInput() throws Exception {
        int someSize = 2048;
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        input.add(ParallelMergeCombiningSequenceTest.blockingSequence(2048, 400, 500, 1, 500, true));
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
        this.expectedException.expectMessage("Sequence iterator timed out waiting for data");
        this.assertException(input, 4096, 16384, 1000L, 0);
    }

    @Test
    public void testTimeoutExceptionDueToStalledReader() throws Exception {
        int someSize = 2048;
        ArrayList<Sequence<IntPair>> input = new ArrayList<Sequence<IntPair>>();
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        input.add(ParallelMergeCombiningSequenceTest.nonBlockingSequence(2048));
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
        this.expectedException.expectMessage("Sequence iterator timed out");
        this.assertException(input, 8, 64, 1000L, 500);
    }

    private void assertResult(List<Sequence<IntPair>> sequences) throws InterruptedException, IOException {
        this.assertResult(sequences, 4096, 16384, null);
    }

    private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter) throws InterruptedException, IOException {
        this.assertResult(sequences, batchSize, yieldAfter, null);
    }

    private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter, Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter) throws InterruptedException, IOException {
        CombiningSequence combiningSequence = CombiningSequence.create((Sequence)new MergeSequence(INT_PAIR_ORDERING, Sequences.simple(sequences)), INT_PAIR_ORDERING, INT_PAIR_MERGE_FN);
        ParallelMergeCombiningSequence parallelMergeCombineSequence = new ParallelMergeCombiningSequence(this.pool, sequences, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 5000L, 0, 4, yieldAfter, batchSize, 100, reporter);
        Yielder combiningYielder = Yielders.each((Sequence)combiningSequence);
        Yielder parallelMergeCombineYielder = Yielders.each((Sequence)parallelMergeCombineSequence);
        IntPair prev = null;
        while (!combiningYielder.isDone() && !parallelMergeCombineYielder.isDone()) {
            Assert.assertEquals((Object)combiningYielder.get(), (Object)parallelMergeCombineYielder.get());
            Assert.assertNotEquals((Object)parallelMergeCombineYielder.get(), prev);
            prev = (IntPair)((Object)parallelMergeCombineYielder.get());
            combiningYielder = combiningYielder.next(combiningYielder.get());
            parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
        }
        Assert.assertTrue((boolean)combiningYielder.isDone());
        Assert.assertTrue((boolean)parallelMergeCombineYielder.isDone());
        while (this.pool.getRunningThreadCount() > 0) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((long)0L, (long)this.pool.getRunningThreadCount());
        combiningYielder.close();
        parallelMergeCombineYielder.close();
    }

    private void assertException(List<Sequence<IntPair>> sequences) throws Exception {
        this.assertException(sequences, 4096, 16384, 5000L, 0);
    }

    private void assertException(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter, long timeout, int readDelayMillis) throws Exception {
        try {
            ParallelMergeCombiningSequence parallelMergeCombineSequence = new ParallelMergeCombiningSequence(this.pool, sequences, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, timeout, 0, 4, yieldAfter, batchSize, 100, null);
            Yielder parallelMergeCombineYielder = Yielders.each((Sequence)parallelMergeCombineSequence);
            IntPair prev = null;
            while (!parallelMergeCombineYielder.isDone()) {
                Assert.assertNotEquals((Object)parallelMergeCombineYielder.get(), prev);
                prev = (IntPair)((Object)parallelMergeCombineYielder.get());
                if (readDelayMillis > 0 && ThreadLocalRandom.current().nextBoolean()) {
                    Thread.sleep(readDelayMillis);
                }
                parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
            }
            parallelMergeCombineYielder.close();
        }
        catch (Exception ex) {
            LOG.warn((Throwable)ex, "exception:", new Object[0]);
            throw ex;
        }
    }

    public static Sequence<IntPair> nonBlockingSequence(final int size, final boolean lazyGenerate) {
        final List<IntPair> pairs = lazyGenerate ? null : ParallelMergeCombiningSequenceTest.generateOrderedPairs(size);
        return new BaseSequence((BaseSequence.IteratorMaker)new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>(){

            public Iterator<IntPair> make() {
                return new Iterator<IntPair>(){
                    int mergeKey = 0;
                    int rowCounter = 0;

                    @Override
                    public boolean hasNext() {
                        return this.rowCounter < size;
                    }

                    @Override
                    public IntPair next() {
                        if (lazyGenerate) {
                            ++this.rowCounter;
                            this.mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount();
                            return ParallelMergeCombiningSequenceTest.makeIntPair(this.mergeKey);
                        }
                        return (IntPair)((Object)pairs.get(this.rowCounter++));
                    }
                };
            }

            public void cleanup(Iterator<IntPair> iterFromMake) {
            }
        });
    }

    public static Sequence<IntPair> blockingSequence(final int size, int startDelayStartMillis, int startDelayEndMillis, final int iterationDelayFrequency, final int maxIterationDelayMillis, final boolean lazyGenerate) {
        final List<IntPair> pairs = lazyGenerate ? null : ParallelMergeCombiningSequenceTest.generateOrderedPairs(size);
        long startDelayMillis = ThreadLocalRandom.current().nextLong(startDelayStartMillis, startDelayEndMillis);
        final long delayUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(startDelayMillis, TimeUnit.MILLISECONDS);
        return new BaseSequence((BaseSequence.IteratorMaker)new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>(){

            public Iterator<IntPair> make() {
                return new Iterator<IntPair>(){
                    int mergeKey = 0;
                    int rowCounter = 0;

                    @Override
                    public boolean hasNext() {
                        return this.rowCounter < size;
                    }

                    @Override
                    public IntPair next() {
                        try {
                            long currentNano = System.nanoTime();
                            if (this.rowCounter == 0 && currentNano < delayUntil) {
                                long sleepMillis = Math.max(TimeUnit.MILLISECONDS.convert(delayUntil - currentNano, TimeUnit.NANOSECONDS), 1L);
                                Thread.sleep(sleepMillis);
                            } else if (maxIterationDelayMillis > 0 && this.rowCounter % iterationDelayFrequency == 0 && ThreadLocalRandom.current().nextBoolean()) {
                                int delayMillis = Math.max(ThreadLocalRandom.current().nextInt(maxIterationDelayMillis), 1);
                                Thread.sleep(delayMillis);
                            }
                        }
                        catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                        if (lazyGenerate) {
                            ++this.rowCounter;
                            this.mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount();
                            return ParallelMergeCombiningSequenceTest.makeIntPair(this.mergeKey);
                        }
                        return (IntPair)((Object)pairs.get(this.rowCounter++));
                    }
                };
            }

            public void cleanup(Iterator<IntPair> iterFromMake) {
            }
        });
    }

    private static Sequence<IntPair> nonBlockingSequence(int size) {
        return ParallelMergeCombiningSequenceTest.nonBlockingSequence(size, false);
    }

    private static Sequence<IntPair> explodingSequence(final int explodeAfter) {
        final int explodeAt = explodeAfter + 1;
        return new BaseSequence((BaseSequence.IteratorMaker)new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>(){

            public Iterator<IntPair> make() {
                return new Iterator<IntPair>(){
                    int mergeKey = 0;
                    int rowCounter = 0;

                    @Override
                    public boolean hasNext() {
                        return this.rowCounter < explodeAt;
                    }

                    @Override
                    public IntPair next() {
                        if (this.rowCounter == explodeAfter) {
                            throw new RuntimeException("exploded");
                        }
                        this.mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount();
                        ++this.rowCounter;
                        return ParallelMergeCombiningSequenceTest.makeIntPair(this.mergeKey);
                    }
                };
            }

            public void cleanup(Iterator<IntPair> iterFromMake) {
            }
        });
    }

    private static List<IntPair> generateOrderedPairs(int length) {
        int mergeKey = 0;
        ArrayList<IntPair> generatedSequence = new ArrayList<IntPair>(length);
        for (int rowCounter = 0; rowCounter < length; ++rowCounter) {
            generatedSequence.add(ParallelMergeCombiningSequenceTest.makeIntPair(mergeKey += ParallelMergeCombiningSequenceTest.incrementMergeKeyAmount()));
        }
        return generatedSequence;
    }

    private static int incrementMergeKeyAmount() {
        return ThreadLocalRandom.current().nextInt(1, 3);
    }

    private static IntPair makeIntPair(int mergeKey) {
        return new IntPair(mergeKey, ThreadLocalRandom.current().nextInt(1, 100));
    }

    public static class IntPair
    extends Pair<Integer, Integer> {
        private IntPair(Integer lhs, Integer rhs) {
            super((Object)lhs, (Object)rhs);
        }
    }
}

