package org.apache.flink.streaming.api.operators.async;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.class */
public class AsyncWaitOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private static final String STATE_NAME = "_async_wait_operator_state_";
    private final int capacity;
    private final AsyncDataStream.OutputMode outputMode;
    private final long timeout;
    private transient StreamElementSerializer<IN> inStreamElementSerializer;
    private transient ListState<StreamElement> recoveredStreamElements;
    private transient StreamElementQueue<OUT> queue;
    private final transient MailboxExecutor mailboxExecutor;
    private transient TimestampedCollector<OUT> timestampedCollector;
    private transient boolean isObjectReuseEnabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode = new int[AsyncDataStream.OutputMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode[AsyncDataStream.OutputMode.ORDERED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode[AsyncDataStream.OutputMode.UNORDERED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperator$ResultHandler.class */
    public class ResultHandler implements ResultFuture<OUT> {
        private ScheduledFuture<?> timeoutTimer;
        private final StreamRecord<IN> inputRecord;
        private final ResultFuture<OUT> resultFuture;
        private final AtomicBoolean completed = new AtomicBoolean(false);

        ResultHandler(StreamRecord<IN> streamRecord, ResultFuture<OUT> resultFuture) {
            this.inputRecord = streamRecord;
            this.resultFuture = resultFuture;
        }

        @Override // org.apache.flink.streaming.api.functions.async.ResultFuture
        public void complete(Collection<OUT> collection) {
            Preconditions.checkNotNull(collection, "Results must not be null, use empty collection to emit nothing");
            if (this.completed.compareAndSet(false, true)) {
                processInMailbox(collection);
            }
        }

        private void processInMailbox(Collection<OUT> collection) {
            AsyncWaitOperator.this.mailboxExecutor.execute(() -> {
                processResults(collection);
            }, "Result in AsyncWaitOperator of input %s", collection);
        }

        private void processResults(Collection<OUT> collection) {
            if (this.timeoutTimer != null) {
                this.timeoutTimer.cancel(true);
            }
            this.resultFuture.complete(collection);
            AsyncWaitOperator.this.outputCompletedElement();
        }

        @Override // org.apache.flink.streaming.api.functions.async.ResultFuture
        public void completeExceptionally(Throwable th) {
            if (this.completed.compareAndSet(false, true)) {
                AsyncWaitOperator.this.getContainingTask().getEnvironment().failExternally(new Exception("Could not complete the stream element: " + this.inputRecord + '.', th));
                processInMailbox(Collections.emptyList());
            }
        }

        public void registerTimeout(ProcessingTimeService processingTimeService, long j) {
            this.timeoutTimer = processingTimeService.registerTimer(j + processingTimeService.getCurrentProcessingTime(), j2 -> {
                timerTriggered();
            });
        }

        private void timerTriggered() throws Exception {
            if (this.completed.get()) {
                return;
            }
            ((AsyncFunction) AsyncWaitOperator.this.userFunction).timeout(this.inputRecord.getValue(), this);
        }
    }

    public AsyncWaitOperator(@Nonnull AsyncFunction<IN, OUT> asyncFunction, long j, int i, @Nonnull AsyncDataStream.OutputMode outputMode, @Nonnull ProcessingTimeService processingTimeService, @Nonnull MailboxExecutor mailboxExecutor) {
        super(asyncFunction);
        setChainingStrategy(ChainingStrategy.ALWAYS);
        Preconditions.checkArgument(i > 0, "The number of concurrent async operation should be greater than 0.");
        this.capacity = i;
        this.outputMode = (AsyncDataStream.OutputMode) Preconditions.checkNotNull(outputMode, "outputMode");
        this.timeout = j;
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.mailboxExecutor = mailboxExecutor;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.SetupableStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode[this.outputMode.ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
                this.queue = new OrderedStreamElementQueue(this.capacity);
                break;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                this.queue = new UnorderedStreamElementQueue(this.capacity);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + this.outputMode + '.');
        }
        this.timestampedCollector = new TimestampedCollector<>(this.output);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();
        if (this.recoveredStreamElements != null) {
            for (StreamElement streamElement : (Iterable) this.recoveredStreamElements.get()) {
                if (streamElement.isRecord()) {
                    processElement(streamElement.asRecord());
                } else if (streamElement.isWatermark()) {
                    processWatermark(streamElement.asWatermark());
                } else {
                    if (!streamElement.isLatencyMarker()) {
                        throw new IllegalStateException("Unknown record type " + streamElement.getClass() + " encountered while opening the operator.");
                    }
                    processLatencyMarker(streamElement.asLatencyMarker());
                }
            }
            this.recoveredStreamElements = null;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        StreamRecord<IN> streamRecord2 = this.isObjectReuseEnabled ? (StreamRecord) this.inStreamElementSerializer.copy((StreamElement) streamRecord) : streamRecord;
        ResultHandler resultHandler = new ResultHandler(streamRecord2, addToWorkQueue(streamRecord2));
        if (this.timeout > 0) {
            resultHandler.registerTimeout(getProcessingTimeService(), this.timeout);
        }
        ((AsyncFunction) this.userFunction).asyncInvoke(streamRecord2.getValue(), resultHandler);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        addToWorkQueue(watermark);
        outputCompletedElement();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        ListState listState = getOperatorStateBackend().getListState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
        listState.clear();
        try {
            listState.addAll(this.queue.values());
        } catch (Exception e) {
            listState.clear();
            throw new Exception("Could not add stream element queue entries to operator state backend of operator " + getOperatorName() + '.', e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.recoveredStreamElements = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        waitInFlightInputsFinished();
    }

    private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException {
        while (true) {
            Optional<ResultFuture<OUT>> tryPut = this.queue.tryPut(streamElement);
            if (tryPut.isPresent()) {
                return tryPut.get();
            }
            this.mailboxExecutor.yield();
        }
    }

    private void waitInFlightInputsFinished() throws InterruptedException {
        while (!this.queue.isEmpty()) {
            this.mailboxExecutor.yield();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void outputCompletedElement() {
        if (this.queue.hasCompletedElements()) {
            this.queue.emitCompletedElement(this.timestampedCollector);
            if (this.queue.hasCompletedElements()) {
                this.mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
            }
        }
    }
}
