/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.test.HadoopTestBase;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.functional.TaskPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestTaskPool
extends HadoopTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestTaskPool.class);
    public static final int ITEM_COUNT = 16;
    private static final int FAILPOINT = 8;
    private int numThreads;
    private ExecutorService threadPool;
    private TaskPool.Submitter submitter;
    private final CounterTask failingTask = new CounterTask("failing committer", 8, Item::commit);
    private final FailureCounter failures = new FailureCounter("failures", 0, null);
    private final CounterTask reverter = new CounterTask("reverter", 0, Item::revert);
    private final CounterTask aborter = new CounterTask("aborter", 0, Item::abort);
    private List<Item> items;

    public static Collection<Object[]> params() {
        return Arrays.asList({0}, {1}, {3}, {8}, {16});
    }

    public void initTestTaskPool(int pNumThreads) {
        this.numThreads = pNumThreads;
    }

    public boolean isParallel() {
        return this.numThreads > 1;
    }

    @BeforeEach
    public void setup() {
        this.items = IntStream.rangeClosed(1, 16).mapToObj(i -> new Item(i, String.format("With %d threads", this.numThreads))).collect(Collectors.toList());
        if (this.numThreads > 0) {
            this.threadPool = Executors.newFixedThreadPool(this.numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(this.getMethodName() + "-pool-%d").build());
            this.submitter = new PoolSubmitter();
        } else {
            this.submitter = null;
        }
    }

    @AfterEach
    public void teardown() {
        if (this.threadPool != null) {
            this.threadPool.shutdown();
            this.threadPool = null;
        }
    }

    private TaskPool.Builder<Item> builder() {
        return TaskPool.foreach(this.items).executeWith(this.submitter);
    }

    private void assertRun(TaskPool.Builder<Item> builder, CounterTask task) throws IOException {
        boolean b = builder.run((TaskPool.Task)task);
        TestTaskPool.assertTrue((boolean)b, (String)("Run of " + task + " failed"));
    }

    private void assertFailed(TaskPool.Builder<Item> builder, CounterTask task) throws IOException {
        boolean b = builder.run((TaskPool.Task)task);
        TestTaskPool.assertFalse((boolean)b, (String)("Run of " + task + " unexpectedly succeeded"));
    }

    private String itemsToString() {
        return "[" + this.items.stream().map(Item::toString).collect(Collectors.joining("\n")) + "]";
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testSimpleInvocation(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        CounterTask t = new CounterTask("simple", 0, Item::commit);
        this.assertRun(this.builder(), t);
        t.assertInvoked("", 16);
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailNoStoppingSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().suppressExceptions(), this.failingTask);
        this.failingTask.assertInvoked("Continued through operations", 16);
        this.items.forEach(Item::assertCommittedOrFailed);
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailFastSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().suppressExceptions().stopOnFailure(), this.failingTask);
        if (this.isParallel()) {
            this.failingTask.assertInvokedAtLeast("stop fast", 8);
        } else {
            this.failingTask.assertInvoked("stop fast", 8);
        }
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailedCallAbortSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().stopOnFailure().suppressExceptions().abortWith((TaskPool.Task)this.aborter), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        if (!this.isParallel()) {
            this.aborter.assertInvokedAtLeast("abort", 1);
            this.items.stream().filter(i -> !i.committed).map(Item::assertAborted);
            this.items.stream().filter(i -> i.committed).forEach(i -> TestTaskPool.assertFalse((boolean)i.aborted, (String)i.toString()));
        }
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailedCalledWhenNotStoppingSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().suppressExceptions().onFailure((TaskPool.FailureTask)this.failures), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        this.failures.assertInvoked("failure event", 1);
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailFastCallRevertSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().stopOnFailure().revertWith((TaskPool.Task)this.reverter).abortWith((TaskPool.Task)this.aborter).suppressExceptions().onFailure((TaskPool.FailureTask)this.failures), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        if (!this.isParallel()) {
            this.aborter.assertInvokedAtLeast("abort", 1);
            this.items.stream().filter(i -> !i.committed).filter(i -> !i.failed).forEach(Item::assertAborted);
        }
        this.items.stream().filter(i -> i.committed && !i.failed).forEach(Item::assertReverted);
        this.items.stream().filter(i -> i.reverted).forEach(Item::assertCommitted);
        this.failures.assertInvoked("failure event", 1);
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailSlowCallRevertSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().suppressExceptions().revertWith((TaskPool.Task)this.reverter).onFailure((TaskPool.FailureTask)this.failures), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        int failing = this.failures.getItem().id;
        this.items.stream().filter(i -> i.id != failing).filter(i -> i.committed).forEach(Item::assertReverted);
        this.items.stream().filter(i -> i.reverted).forEach(Item::assertCommitted);
        this.failures.assertInvoked("failure event", 1);
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailFastExceptions(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        LambdaTestUtils.intercept(IOException.class, () -> this.builder().stopOnFailure().run((TaskPool.Task)this.failingTask));
        if (this.isParallel()) {
            this.failingTask.assertInvokedAtLeast("stop fast", 8);
        } else {
            this.failingTask.assertInvoked("stop fast", 8);
        }
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailSlowExceptions(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        LambdaTestUtils.intercept(IOException.class, () -> this.builder().run((TaskPool.Task)this.failingTask));
        this.failingTask.assertInvoked("continued through operations", 16);
        this.items.forEach(Item::assertCommittedOrFailed);
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailFastExceptionsWithAbortFailure(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        CounterTask failFirst = new CounterTask("task", 1, Item::commit);
        CounterTask a = new CounterTask("aborter", 1, Item::abort);
        LambdaTestUtils.intercept(IOException.class, () -> this.builder().stopOnFailure().abortWith((TaskPool.Task)a).run((TaskPool.Task)failFirst));
        if (!this.isParallel()) {
            a.assertInvokedAtLeast("abort", 15);
        }
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testFailFastExceptionsWithAbortFailureStopped(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        CounterTask failFirst = new CounterTask("task", 1, Item::commit);
        CounterTask a = new CounterTask("aborter", 1, Item::abort);
        LambdaTestUtils.intercept(IOException.class, () -> this.builder().stopOnFailure().stopAbortsOnFailure().abortWith((TaskPool.Task)a).run((TaskPool.Task)failFirst));
        if (!this.isParallel()) {
            a.assertInvoked("abort", 1);
        }
    }

    @ParameterizedTest(name="threads={0}")
    @MethodSource(value={"params"})
    public void testRevertAllSuppressed(int pNumThreads) throws Throwable {
        this.initTestTaskPool(pNumThreads);
        CounterTask failLast = new CounterTask("task", 16, Item::commit);
        this.assertFailed((TaskPool.Builder<Item>)this.builder().suppressExceptions().stopOnFailure().revertWith((TaskPool.Task)this.reverter).abortWith((TaskPool.Task)this.aborter).onFailure((TaskPool.FailureTask)this.failures), failLast);
        failLast.assertInvoked("success", 16);
        int abCount = this.aborter.getCount();
        int revCount = this.reverter.getCount();
        TestTaskPool.assertEquals((int)16, (int)(1 + abCount + revCount));
        int failing = this.failures.getItem().id;
        this.items.stream().filter(i -> i.id != failing).filter(i -> i.committed).forEach(Item::assertReverted);
        this.items.stream().filter(i -> i.id != failing).filter(i -> !i.committed).forEach(Item::assertAborted);
        this.items.stream().filter(i -> i.reverted).forEach(Item::assertCommitted);
        this.failures.assertInvoked("failure event", 1);
    }

    private final class CounterTask
    extends BaseCounter
    implements TaskPool.Task<Item, IOException> {
        private CounterTask(String name, int limit, Function<Item, Boolean> action) {
            super(name, limit, action);
        }

        public void run(Item item) throws IOException {
            this.process(item);
        }
    }

    private final class FailureCounter
    extends BaseCounter
    implements TaskPool.FailureTask<Item, IOException> {
        private Exception exception;

        private FailureCounter(String name, int limit, Function<Item, Boolean> action) {
            super(name, limit, action);
        }

        public void run(Item item, Exception ex) throws IOException {
            this.process(item);
            this.exception = ex;
        }

        private Exception getException() {
            return this.exception;
        }
    }

    private class PoolSubmitter
    implements TaskPool.Submitter {
        private PoolSubmitter() {
        }

        public Future<?> submit(Runnable task) {
            return TestTaskPool.this.threadPool.submit(task);
        }
    }

    private final class Item {
        private final int id;
        private final String text;
        private volatile boolean committed;
        private volatile boolean aborted;
        private volatile boolean reverted;
        private volatile boolean failed;

        private Item(int item, String text) {
            this.id = item;
            this.text = text;
        }

        boolean commit() {
            this.committed = true;
            return true;
        }

        boolean abort() {
            this.aborted = true;
            return true;
        }

        boolean revert() {
            this.reverted = true;
            return true;
        }

        boolean fail() {
            this.failed = true;
            return true;
        }

        public Item assertCommitted() {
            Assertions.assertTrue((boolean)this.committed, (String)(this.toString() + " was not committed in\n" + TestTaskPool.this.itemsToString()));
            return this;
        }

        public Item assertCommittedOrFailed() {
            Assertions.assertTrue((this.committed || this.failed ? 1 : 0) != 0, (String)(this.toString() + " was not committed nor failed in\n" + TestTaskPool.this.itemsToString()));
            return this;
        }

        public Item assertAborted() {
            Assertions.assertTrue((boolean)this.aborted, (String)(this.toString() + " was not aborted in\n" + TestTaskPool.this.itemsToString()));
            return this;
        }

        public Item assertReverted() {
            Assertions.assertTrue((boolean)this.reverted, (String)(this.toString() + " was not reverted in\n" + TestTaskPool.this.itemsToString()));
            return this;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("Item{");
            sb.append(String.format("[%02d]", this.id));
            sb.append(", committed=").append(this.committed);
            sb.append(", aborted=").append(this.aborted);
            sb.append(", reverted=").append(this.reverted);
            sb.append(", failed=").append(this.failed);
            sb.append(", text=").append(this.text);
            sb.append('}');
            return sb.toString();
        }
    }

    private class BaseCounter {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final int limit;
        private final String name;
        private Item item;
        private final Optional<Function<Item, Boolean>> action;

        BaseCounter(String name, int limit, Function<Item, Boolean> action) {
            this.name = name;
            this.limit = limit;
            this.action = Optional.ofNullable(action);
        }

        void process(Item i) throws IOException {
            this.item = i;
            int count = this.counter.incrementAndGet();
            if (this.limit == count) {
                i.fail();
                LOG.info("{}: Failed {}", (Object)this, (Object)i);
                throw new IOException(String.format("%s: Limit %d reached for %s", this, this.limit, i));
            }
            String before = i.toString();
            this.action.map(a -> (Boolean)a.apply(i));
            LOG.info("{}: {} -> {}", new Object[]{this, before, i});
        }

        int getCount() {
            return this.counter.get();
        }

        Item getItem() {
            return this.item;
        }

        void assertInvoked(String text, int expected) {
            Assertions.assertEquals((int)expected, (int)this.getCount(), (String)(this.toString() + ": " + text));
        }

        void assertInvokedAtLeast(String text, int expected) {
            int actual = this.getCount();
            Assertions.assertTrue((expected <= actual ? 1 : 0) != 0, (String)(this.toString() + ": " + text + "-expected " + expected + " invocations, but got " + actual + " in " + TestTaskPool.this.itemsToString()));
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("BaseCounter{");
            sb.append("name='").append(this.name).append('\'');
            sb.append(", count=").append(this.counter.get());
            sb.append(", limit=").append(this.limit);
            sb.append(", item=").append(this.item);
            sb.append('}');
            return sb.toString();
        }
    }
}

