/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.util;

import java.util.concurrent.TimeoutException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestProducerConsumer {
    @Test
    public void testSimpleProducerConsumer() {
        ProducerConsumer worker = new ProducerConsumer(1);
        worker.addWorker((WorkRequestProcessor)new CopyProcessor());
        worker.put(new WorkRequest((Object)42));
        try {
            WorkReport report = worker.take();
            Assertions.assertEquals((int)42, (int)((Integer)report.getItem()));
        }
        catch (InterruptedException ie) {
            Assertions.assertTrue((boolean)false);
        }
        worker.shutdown();
    }

    @Test
    public void testMultipleProducerConsumer() {
        ProducerConsumer workers = new ProducerConsumer(10);
        for (int i = 0; i < 10; ++i) {
            workers.addWorker((WorkRequestProcessor)new CopyProcessor());
        }
        int sum = 0;
        int numRequests = 2000;
        for (int i = 0; i < numRequests; ++i) {
            workers.put(new WorkRequest((Object)(i + 42)));
            sum += i + 42;
        }
        int numReports = 0;
        while (workers.getWorkCnt() > 0) {
            WorkReport report = workers.blockingTake();
            sum -= ((Integer)report.getItem()).intValue();
            ++numReports;
        }
        Assertions.assertEquals((int)0, (int)sum);
        Assertions.assertEquals((int)numRequests, (int)numReports);
        workers.shutdown();
    }

    @Test
    public void testExceptionProducerConsumer() {
        ProducerConsumer worker = new ProducerConsumer(1);
        worker.addWorker((WorkRequestProcessor)new ExceptionProcessor());
        worker.put(new WorkRequest((Object)42));
        try {
            WorkReport report = worker.take();
            Assertions.assertEquals((int)42, (int)((Integer)report.getItem()));
            Assertions.assertFalse((boolean)report.getSuccess());
            Assertions.assertNotNull((Object)report.getException());
        }
        catch (InterruptedException ie) {
            Assertions.assertTrue((boolean)false);
        }
        worker.shutdown();
    }

    @Test
    public void testSimpleProducerConsumerShutdown() throws InterruptedException, TimeoutException {
        ProducerConsumer worker = new ProducerConsumer(1);
        worker.addWorker((WorkRequestProcessor)new CopyProcessor());
        worker.shutdown();
        GenericTestUtils.waitForThreadTermination((String)"pool-.*-thread.*", (int)100, (int)10000);
    }

    @Test
    @Timeout(value=10L)
    public void testMultipleProducerConsumerShutdown() throws InterruptedException, TimeoutException {
        int numWorkers = 10;
        final ProducerConsumer worker = new ProducerConsumer(numWorkers);
        for (int i = 0; i < numWorkers; ++i) {
            worker.addWorker((WorkRequestProcessor)new CopyProcessor());
        }
        class SourceThread
        extends Thread {
            SourceThread() {
            }

            @Override
            public void run() {
                try {
                    while (true) {
                        worker.put(new WorkRequest((Object)42));
                        Thread.sleep(1L);
                    }
                }
                catch (InterruptedException ie) {
                    return;
                }
            }
        }
        SourceThread source = new SourceThread();
        source.start();
        class SinkThread
        extends Thread {
            SinkThread() {
            }

            @Override
            public void run() {
                try {
                    while (true) {
                        WorkReport report = worker.take();
                        Assertions.assertEquals((int)42, (int)((Integer)report.getItem()));
                    }
                }
                catch (InterruptedException ie) {
                    return;
                }
            }
        }
        SinkThread sink = new SinkThread();
        sink.start();
        Thread.sleep(1000L);
        source.interrupt();
        while (worker.hasWork()) {
            Thread.sleep(1L);
        }
        worker.shutdown();
        GenericTestUtils.waitForThreadTermination((String)"pool-.*-thread.*", (int)100, (int)10000);
        sink.interrupt();
        source.join();
        sink.join();
    }

    public class CopyProcessor
    implements WorkRequestProcessor<Integer, Integer> {
        public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
            Integer item = new Integer((Integer)workRequest.getItem());
            return new WorkReport((Object)item, 0, true);
        }
    }

    public class ExceptionProcessor
    implements WorkRequestProcessor<Integer, Integer> {
        public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
            try {
                Integer item = null;
                item.intValue();
                return new WorkReport((Object)item, 0, true);
            }
            catch (Exception e) {
                Integer item = new Integer((Integer)workRequest.getItem());
                return new WorkReport((Object)item, 1, false, e);
            }
        }
    }
}

