/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.common.stream;

import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.common.stream.AbortedStreamException;
import io.opentelemetry.testing.internal.armeria.common.stream.AggregationSupport;
import io.opentelemetry.testing.internal.armeria.common.stream.CancelledSubscriptionException;
import io.opentelemetry.testing.internal.armeria.common.stream.StreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.SubscriptionOption;
import io.opentelemetry.testing.internal.armeria.common.util.CompositeException;
import io.opentelemetry.testing.internal.armeria.common.util.EventLoopCheckingFuture;
import io.opentelemetry.testing.internal.armeria.common.util.Exceptions;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.AbortingSubscriber;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.InternalStreamMessageUtil;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.NeverInvokedSubscriber;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.NoopSubscription;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.StreamMessageUtil;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.SubscriberUtil;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.base.MoreObjects;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.EventExecutor;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class CancellableStreamMessage<T>
extends AggregationSupport
implements StreamMessage<T> {
    static final Logger logger = LoggerFactory.getLogger(CancellableStreamMessage.class);
    static final CloseEvent SUCCESSFUL_CLOSE = new CloseEvent(null);
    static final CloseEvent CANCELLED_CLOSE = new CloseEvent(CancelledSubscriptionException.INSTANCE);
    static final CloseEvent ABORTED_CLOSE = new CloseEvent(AbortedStreamException.INSTANCE);
    private final CompletableFuture<Void> completionFuture = new EventLoopCheckingFuture<Void>();

    CancellableStreamMessage() {
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor) {
        this.subscribe(subscriber, executor, InternalStreamMessageUtil.EMPTY_OPTIONS);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor, SubscriptionOption ... options) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(options, "options");
        SubscriptionImpl subscription = new SubscriptionImpl(this, subscriber, executor, options);
        SubscriptionImpl actualSubscription = this.subscribe(subscription);
        if (actualSubscription != subscription) {
            CancellableStreamMessage.failLateSubscriber(actualSubscription, subscription);
        }
    }

    abstract SubscriptionImpl subscribe(SubscriptionImpl var1);

    @Override
    public final CompletableFuture<Void> whenComplete() {
        return this.completionFuture;
    }

    abstract void request(long var1);

    abstract void cancel();

    protected void onRemoval(T obj) {
    }

    static void failLateSubscriber(SubscriptionImpl actualSubscription, SubscriptionImpl lateSubscription) {
        Subscriber<Object> actualSubscriber = actualSubscription.subscriber();
        Subscriber<Object> lateSubscriber = lateSubscription.subscriber();
        Throwable cause = SubscriberUtil.abortedOrLate(actualSubscriber);
        if (lateSubscription.needsDirectInvocation()) {
            CancellableStreamMessage.handleLateSubscriber(lateSubscriber, cause);
        } else {
            lateSubscription.executor().execute(() -> CancellableStreamMessage.handleLateSubscriber(lateSubscriber, cause));
        }
    }

    private static void handleLateSubscriber(Subscriber<?> lateSubscriber, Throwable cause) {
        try {
            lateSubscriber.onSubscribe(NoopSubscription.get());
            lateSubscriber.onError(cause);
        }
        catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            logger.warn("Subscriber should not throw an exception. subscriber: {}", lateSubscriber, (Object)t);
        }
    }

    final T prepareObjectForNotification(T o, boolean withPooledObjects) {
        this.onRemoval(o);
        return StreamMessageUtil.touchOrCopyAndClose(o, withPooledObjects);
    }

    static CloseEvent newCloseEvent(Throwable cause) {
        if (cause == CancelledSubscriptionException.INSTANCE) {
            return CANCELLED_CLOSE;
        }
        if (cause == AbortedStreamException.INSTANCE) {
            return ABORTED_CLOSE;
        }
        return new CloseEvent(cause);
    }

    static final class SubscriptionImpl
    implements Subscription {
        private final CancellableStreamMessage<?> publisher;
        private Subscriber<Object> subscriber;
        private final EventExecutor executor;
        private final SubscriptionOption[] options;
        private final boolean withPooledObjects;
        private final boolean notifyCancellation;
        private volatile boolean cancelRequested;

        SubscriptionImpl(CancellableStreamMessage<?> publisher, Subscriber<?> subscriber, EventExecutor executor, SubscriptionOption[] options) {
            this.publisher = publisher;
            this.subscriber = subscriber;
            this.executor = executor;
            this.options = options;
            this.withPooledObjects = InternalStreamMessageUtil.containsWithPooledObjects(options);
            this.notifyCancellation = InternalStreamMessageUtil.containsNotifyCancellation(options);
        }

        Subscriber<Object> subscriber() {
            return this.subscriber;
        }

        void clearSubscriber() {
            if (!(this.subscriber instanceof AbortingSubscriber)) {
                this.subscriber = NeverInvokedSubscriber.get();
            }
        }

        EventExecutor executor() {
            return this.executor;
        }

        SubscriptionOption[] options() {
            return this.options;
        }

        boolean withPooledObjects() {
            return this.withPooledObjects;
        }

        boolean cancelRequested() {
            return this.cancelRequested;
        }

        @Override
        public void request(long n) {
            if (n <= 0L) {
                this.publisher.abort(new IllegalArgumentException("n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)"));
                return;
            }
            this.publisher.request(n);
        }

        @Override
        public void cancel() {
            this.cancelRequested = true;
            this.publisher.cancel();
        }

        boolean needsDirectInvocation() {
            return this.executor.inEventLoop();
        }

        public String toString() {
            return MoreObjects.toStringHelper(Subscription.class).add("publisher", this.publisher).add("demand", this.publisher.demand()).add("executor", this.executor).add("options", this.options).toString();
        }
    }

    static final class CloseEvent {
        @Nullable
        final Throwable cause;

        CloseEvent(@Nullable Throwable cause) {
            this.cause = cause;
        }

        void notifySubscriber(SubscriptionImpl subscription, CompletableFuture<?> completionFuture) {
            if (completionFuture.isDone()) {
                return;
            }
            Subscriber<Object> subscriber = subscription.subscriber();
            Throwable cause = this.cause;
            if (cause == null && subscription.cancelRequested()) {
                cause = CancelledSubscriptionException.get();
            }
            if (cause == null) {
                try {
                    subscriber.onComplete();
                    completionFuture.complete(null);
                }
                catch (Throwable t) {
                    completionFuture.completeExceptionally(t);
                    Exceptions.throwIfFatal(t);
                    logger.warn("Subscriber.onComplete() should not raise an exception. subscriber: {}", subscriber, (Object)t);
                }
            } else {
                try {
                    if (subscription.notifyCancellation || !(cause instanceof CancelledSubscriptionException)) {
                        subscriber.onError(cause);
                    }
                    completionFuture.completeExceptionally(cause);
                }
                catch (Throwable t) {
                    CompositeException composite = new CompositeException(t, cause);
                    completionFuture.completeExceptionally(composite);
                    Exceptions.throwIfFatal(t);
                    logger.warn("Subscriber.onError() should not raise an exception. subscriber: {}", subscriber, (Object)composite);
                }
            }
        }

        public String toString() {
            if (this.cause == null) {
                return "CloseEvent";
            }
            return "CloseEvent(" + this.cause + ')';
        }
    }
}

