/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.testutils.ExceptionallyDoneFuture;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.OperatorChainTest;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TestSpyWrapperStateBackend;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.StreamTaskUtil;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.TestingUncaughtExceptionHandler;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class StreamTaskTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    @Test
    void testSavepointSuspendCompleted() throws Exception {
        this.testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, (SnapshotType)SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL), false);
    }

    @Test
    void testSavepointTerminateCompleted() throws Exception {
        this.testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, (SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), true);
    }

    @Test
    void testSavepointSuspendedAborted() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.testSyncSavepointWithEndInput((task, id) -> task.abortCheckpointOnBarrier(id.longValue(), new CheckpointException(CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)), (SnapshotType)SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL), false)).isInstanceOf(FlinkRuntimeException.class)).hasMessage("Stop-with-savepoint failed.");
    }

    @Test
    void testSavepointTerminateAborted() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.testSyncSavepointWithEndInput((task, id) -> task.abortCheckpointOnBarrier(id.longValue(), new CheckpointException(CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)), (SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), true)).isInstanceOf(FlinkRuntimeException.class)).hasMessage("Stop-with-savepoint failed.");
    }

    @Test
    void testSavepointSuspendAbortedAsync() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.testSyncSavepointWithEndInput((streamTask, abortCheckpointId) -> streamTask.notifyCheckpointAbortAsync(abortCheckpointId.longValue(), 0L), (SnapshotType)SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL), false)).isInstanceOf(FlinkRuntimeException.class)).hasMessage("Stop-with-savepoint failed.");
    }

    @Test
    void testSavepointTerminateAbortedAsync() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.testSyncSavepointWithEndInput((streamTask, abortCheckpointId) -> streamTask.notifyCheckpointAbortAsync(abortCheckpointId.longValue(), 0L), (SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), true)).isInstanceOf(FlinkRuntimeException.class)).hasMessage("Stop-with-savepoint failed.");
    }

    private void testSyncSavepointWithEndInput(BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult, SnapshotType checkpointType, boolean expectEndInput) throws Exception {
        StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>)new TestBoundedOneInputStreamOperator()).build();
        long checkpointId = 1L;
        CountDownLatch savepointTriggeredLatch = new CountDownLatch(1);
        CountDownLatch inputEndedLatch = new CountDownLatch(1);
        MailboxExecutor executor = harness.streamTask.getMailboxExecutorFactory().createExecutor(Integer.MAX_VALUE);
        executor.execute(() -> {
            try {
                harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 1L), new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder());
            }
            catch (IOException e) {
                Fail.fail((String)e.getMessage());
            }
        }, "triggerCheckpointOnBarrier");
        new Thread(() -> {
            try {
                savepointTriggeredLatch.await();
                harness.endInput(expectEndInput);
                inputEndedLatch.countDown();
            }
            catch (InterruptedException e) {
                Fail.fail((String)e.getMessage());
            }
        }).start();
        executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch");
        executor.execute(() -> {
            inputEndedLatch.await();
            savepointResult.accept(harness.streamTask, (Object)1L);
        }, "savepointResult");
        harness.processAll();
        Assertions.assertThat((boolean)TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
    }

    @Test
    void testCleanUpExceptionSuppressing() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>)new FailingTwiceOperator()).build();){
            Assertions.assertThatThrownBy(() -> testHarness.processElement(new StreamRecord((Object)"Doesn't matter", 0L))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ExpectedTestException.class)});
            Assertions.assertThatThrownBy(testHarness::finishProcessing).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FailingTwiceOperator.CloseException.class)});
        }
    }

    @Test
    void testHandleAsyncExceptionDuringRestoring() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        final RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
        mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
        String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
        StreamTaskITCase.NoOpStreamTask initThrowExceptionTask = new StreamTaskITCase.NoOpStreamTask((Environment)mockEnvironment){

            @Override
            protected void init() throws Exception {
                super.init();
                CompletableFuture.runAsync(() -> this.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException)).get();
            }
        };
        initThrowExceptionTask.restore();
        Optional actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
        Throwable actualException = (Throwable)actualExternalFailureCause.orElseThrow(() -> new AssertionError((Object)"Expected exceptional completion"));
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)actualException).isInstanceOf(AsynchronousException.class)).hasMessage("EXPECTED_ERROR MESSAGE").hasCause((Throwable)expectedException);
    }

    @Test
    void testAsyncExceptionHandlerHandleExceptionForwardsMessageProperly() {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
        StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler((Environment)mockEnvironment);
        mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
        String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
        asyncExceptionHandler.handleAsyncException("EXPECTED_ERROR MESSAGE", (Throwable)expectedException);
        Optional actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
        Throwable actualException = (Throwable)actualExternalFailureCause.orElseThrow(() -> new AssertionError((Object)"Expected exceptional completion"));
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)actualException).isInstanceOf(AsynchronousException.class)).hasMessage("EXPECTED_ERROR MESSAGE").hasCause((Throwable)expectedException);
    }

    @Test
    void testEarlyCanceling() throws Exception {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
        cfg.serializeAllConfigs();
        TaskManagerActions taskManagerActions = (TaskManagerActions)Mockito.spy((Object)new NoOpTaskManagerActions());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = new TestTaskBuilder((ShuffleEnvironment)shuffleEnvironment).setInvokable(SourceStreamTask.class).setTaskConfig(cfg.getConfiguration()).setTaskManagerActions(taskManagerActions).build((Executor)EXECUTOR_EXTENSION.getExecutor());
            TaskExecutionState state = new TaskExecutionState(task.getExecutionId(), ExecutionState.RUNNING);
            task.startTaskThread();
            ((TaskManagerActions)Mockito.verify((Object)taskManagerActions, (VerificationMode)Mockito.timeout((long)2000L))).updateTaskExecutionState((TaskExecutionState)Mockito.eq((Object)state));
            task.cancelExecution();
            task.getExecutingThread().join();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)task.getExecutingThread().isAlive()).as("Task did not cancel", new Object[0])).isFalse();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
        }
    }

    @Test
    void testStateBackendLoadingAndClosing() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.set(StateBackendOptions.STATE_BACKEND, (Object)TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(StateBackendTestSource.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig, EXECUTOR_EXTENSION.getExecutor());
            StateBackendTestSource.fail = false;
            task.startTaskThread();
            task.getExecutingThread().join();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
        }
    }

    @Test
    void testStateBackendClosingOnFailure() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.set(StateBackendOptions.STATE_BACKEND, (Object)TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(StateBackendTestSource.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig, EXECUTOR_EXTENSION.getExecutor());
            StateBackendTestSource.fail = true;
            task.startTaskThread();
            task.getExecutingThread().join();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        }
    }

    @Test
    void testDecliningCheckpointStreamOperator() throws Exception {
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        ExpectedTestException testException = new ExpectedTestException();
        RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.lambda$testDecliningCheckpointStreamOperator$14(dummyEnvironment, (Exception)testException, operatorSnapshotResult1, operatorSnapshotResult2));
        MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
        StreamTaskUtil.waitTaskIsRunning((StreamTask)streamTask, task.invocationFuture);
        streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        Assertions.assertThatThrownBy(() -> task.waitForTaskCompletion(false)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ExpectedTestException.class)});
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
    }

    @Test
    void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception {
        RuntimeException failingCause = new RuntimeException("Test exception");
        FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(failingCause);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)ExceptionallyDoneFuture.of((Throwable)failingCause), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        RunningTask task = StreamTaskTest.runTask(() -> new MockStreamTask((Environment)failingDummyEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult)), (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler));
        MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
        StreamTaskUtil.waitTaskIsRunning((StreamTask)streamTask, task.invocationFuture);
        streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
        Assertions.assertThat((Throwable)uncaughtException).isSameAs((Object)failingCause);
        streamTask.finishInput();
        task.waitForTaskCompletion(false);
    }

    @Test
    void testForceFullSnapshotOnIncompatibleStateBackend() throws Exception {
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).modifyStreamConfig(config -> config.setStateBackend((StateBackend)new OnlyIncrementalStateBackend())).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamMap((MapFunction & Serializable)value -> null)).build();){
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> harness.streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forConfig((SnapshotType)CheckpointType.FULL_CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (boolean)true, (boolean)false, (long)0L))).isInstanceOf(IllegalStateException.class)).hasMessage("Configured state backend (OnlyIncrementalStateBackend) does not support enforcing a full snapshot. If you are restoring in NO_CLAIM mode, please consider choosing CLAIM mode.");
        }
    }

    @Test
    void testFailingAsyncCheckpointRunnable() throws Exception {
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult3 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        RunnableFuture failingFuture = (RunnableFuture)Mockito.mock(RunnableFuture.class);
        Mockito.when((Object)((SnapshotResult)failingFuture.get())).thenThrow(new Throwable[]{new ExecutionException(new Exception("Test exception"))});
        Mockito.when((Object)operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn((Object)failingFuture);
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult1), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult2), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult3))));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning((StreamTask)streamTask, task.invocationFuture);
            mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
            streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation()).get();
            ExecutorService executor = streamTask.getAsyncOperationsThreadPool();
            executor.shutdown();
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                Fail.fail((String)"Executor did not shut down within the given timeout. This indicates that the checkpointing did not resume.");
            }
            Assertions.assertThat((Optional)mockEnvironment.getActualExternalFailureCause()).isPresent();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult3)).cancel();
            streamTask.finishInput();
            task.waitForTaskCompletion(false);
        }
    }

    @Test
    void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
        final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
        final OneShotLatch completeAcknowledge = new OneShotLatch();
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                acknowledgeCheckpointLatch.trigger();
                while (true) {
                    try {
                        completeAcknowledge.await();
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Mockito.any(JobID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.anyLong(), (CheckpointMetrics)Mockito.any(CheckpointMetrics.class), (TaskStateSnapshot)Mockito.any(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), ExecutionGraphTestUtils.createExecutionAttemptId(), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, (StateChangelogStorage)new InMemoryStateChangelogStorage(), new TaskExecutorStateChangelogStoragesManager(), null, checkpointResponder);
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("mock-task").setTaskStateManager((TaskStateManager)taskStateManager).build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult))));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning((StreamTask)streamTask, task.invocationFuture);
            long checkpointId = 42L;
            streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
            acknowledgeCheckpointLatch.await();
            ArgumentCaptor subtaskStateCaptor = ArgumentCaptor.forClass(TaskStateSnapshot.class);
            ((CheckpointResponder)Mockito.verify((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Mockito.any(JobID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.eq((long)42L), (CheckpointMetrics)Mockito.any(CheckpointMetrics.class), (TaskStateSnapshot)subtaskStateCaptor.capture());
            TaskStateSnapshot subtaskStates = (TaskStateSnapshot)subtaskStateCaptor.getValue();
            OperatorSubtaskState subtaskState = (OperatorSubtaskState)((Map.Entry)subtaskStates.getSubtaskStateMappings().iterator().next()).getValue();
            Assertions.assertThat((Collection)subtaskState.getManagedKeyedState()).isEqualTo((Object)StateObjectCollection.singleton((StateObject)managedKeyedStateHandle));
            Assertions.assertThat((Collection)subtaskState.getRawKeyedState()).isEqualTo((Object)StateObjectCollection.singleton((StateObject)rawKeyedStateHandle));
            Assertions.assertThat((Collection)subtaskState.getManagedOperatorState()).isEqualTo((Object)StateObjectCollection.singleton((StateObject)managedOperatorStateHandle));
            Assertions.assertThat((Collection)subtaskState.getRawOperatorState()).isEqualTo((Object)StateObjectCollection.singleton((StateObject)rawOperatorStateHandle));
            ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            streamTask.cancel();
            completeAcknowledge.trigger();
            ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            task.waitForTaskCompletion(true);
        }
    }

    @Test
    void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
        TestingKeyedStateHandle managedKeyedStateHandle = new TestingKeyedStateHandle();
        TestingKeyedStateHandle rawKeyedStateHandle = new TestingKeyedStateHandle();
        TestingOperatorStateHandle managedOperatorStateHandle = new TestingOperatorStateHandle();
        TestingOperatorStateHandle rawOperatorStateHandle = new TestingOperatorStateHandle();
        BlockingRunnableFuture<SnapshotResult> rawKeyedStateHandleFuture = new BlockingRunnableFuture<SnapshotResult>(2, SnapshotResult.of((StateObject)rawKeyedStateHandle));
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), rawKeyedStateHandleFuture, (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        OneInputStreamOperator streamOperator = StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult);
        AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
        RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(streamOperator)));
        StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
        long checkpointId = 42L;
        ((MockStreamTask)((Object)task.streamTask)).triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        rawKeyedStateHandleFuture.awaitRun();
        ((MockStreamTask)((Object)task.streamTask)).cancel();
        FutureUtils.ConjunctFuture discardFuture = FutureUtils.waitForAll(Arrays.asList(managedKeyedStateHandle.getDiscardFuture(), rawKeyedStateHandle.getDiscardFuture(), managedOperatorStateHandle.getDiscardFuture(), rawOperatorStateHandle.getDiscardFuture()));
        discardFuture.get();
        Assertions.assertThatThrownBy(() -> mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
        task.waitForTaskCompletion(true);
    }

    @Test
    void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
        final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
        final ArrayList checkpointResult = new ArrayList(1);
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                SubtaskState subtaskState = (SubtaskState)invocation.getArgument(4);
                checkpointResult.add(subtaskState);
                checkpointCompletedLatch.trigger();
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Mockito.any(JobID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.anyLong(), (CheckpointMetrics)Mockito.any(CheckpointMetrics.class), (TaskStateSnapshot)ArgumentMatchers.nullable(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), ExecutionGraphTestUtils.createExecutionAttemptId(), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, (StateChangelogStorage)new InMemoryStateChangelogStorage(), new TaskExecutorStateChangelogStoragesManager(), null, checkpointResponder);
        OneInputStreamOperator statelessOperator = StreamTaskTest.streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager((TaskStateManager)taskStateManager).build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(statelessOperator)));
            StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
            ((MockStreamTask)((Object)task.streamTask)).triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
            checkpointCompletedLatch.await(30L, TimeUnit.SECONDS);
            Assertions.assertThat((Object)((SubtaskState)checkpointResult.get(0))).isNull();
            ((MockStreamTask)((Object)task.streamTask)).cancel();
            task.waitForTaskCompletion(true);
        }
    }

    @Test
    void testNotifyCheckpointOnClosedOperator() throws Throwable {
        ClosingOperator operator = new ClosingOperator();
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)operator).build();
        harness.setAutoProcess(false);
        harness.processElement(new StreamRecord((Object)1));
        harness.streamTask.runMailboxStep();
        harness.streamTask.notifyCheckpointCompleteAsync(1L);
        harness.streamTask.runMailboxStep();
        Assertions.assertThat((AtomicInteger)ClosingOperator.notified).hasValue(1);
        Assertions.assertThat((AtomicBoolean)ClosingOperator.closed).isFalse();
        harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor(), StopMode.DRAIN);
        harness.streamTask.operatorChain.closeAllOperators();
        harness.streamTask.notifyCheckpointCompleteAsync(2L);
        harness.streamTask.runMailboxStep();
        Assertions.assertThat((AtomicInteger)ClosingOperator.notified).hasValue(1);
        Assertions.assertThat((AtomicBoolean)ClosingOperator.closed).isTrue();
    }

    @Test
    void testFailToConfirmCheckpointCompleted() throws Exception {
        this.testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointCompleteAsync(1L));
    }

    @Test
    void testFailToConfirmCheckpointAborted() throws Exception {
        this.testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointAbortAsync(1L, 0L));
    }

    private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
        StreamMap streamMap = new StreamMap(new FailOnNotifyCheckpointMapper());
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)streamMap).build();
        Assertions.assertThatThrownBy(() -> {
            consumer.accept(harness.streamTask);
            harness.streamTask.runMailboxLoop();
        }).isInstanceOf(ExpectedTestException.class);
    }

    @Test
    void testCheckpointFailueOnClosedOperator() throws Exception {
        ClosingOperator operator = new ClosingOperator();
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)operator).build();){
            harness.setAutoProcess(false);
            harness.processElement(new StreamRecord((Object)1));
            harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor(), StopMode.DRAIN);
            harness.streamTask.operatorChain.closeAllOperators();
            Assertions.assertThat((boolean)ClosingOperator.closed.get()).isTrue();
            Assertions.assertThatThrownBy(() -> harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder())).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)"OperatorChain and Task should never be closed at this point")});
        }
    }

    @Test
    void testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws Exception {
        final OneShotLatch latch = new OneShotLatch();
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();){
            RunningTask task = StreamTaskTest.runTask(() -> new StreamTask<Object, StreamOperator<Object>>((Environment)mockEnvironment){

                protected void init() throws Exception {
                }

                protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
                    this.mailboxProcessor.getMailboxExecutor(0).execute(() -> ((OneShotLatch)latch).trigger(), "trigger");
                    controller.suspendDefaultAction();
                    this.mailboxProcessor.suspend();
                }
            });
            latch.await();
            task.waitForTaskCompletion(false);
        }
    }

    @Test
    void testThreadInvariants() throws Throwable {
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperator((StreamOperator)new StreamMap((MapFunction & Serializable)value -> value));
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.serializeAllConfigs();
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskConfiguration(taskConfiguration).build();){
            TestUserCodeClassLoader taskClassLoader = new TestUserCodeClassLoader();
            RunningTask runningTask = StreamTaskTest.runTask(() -> {
                Thread.currentThread().setContextClassLoader(taskClassLoader);
                return new ThreadInspectingTask((Environment)mockEnvironment);
            });
            runningTask.invocationFuture.get();
            Assertions.assertThat((Object)((ThreadInspectingTask)((Object)runningTask.streamTask)).getTaskClassLoader()).isSameAs((Object)taskClassLoader);
        }
    }

    @Test
    void testProcessWithAvailableOutput() throws Exception {
        try (MockEnvironment environment = this.setupEnvironment(true, true);){
            int numberOfProcessCalls = 10;
            AvailabilityTestInputProcessor inputProcessor = new AvailabilityTestInputProcessor(10);
            org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)environment).setStreamInputProcessor((StreamInputProcessor)inputProcessor).build();
            task.invoke();
            Assertions.assertThat((int)inputProcessor.currentNumProcessCalls).isEqualTo(10);
        }
    }

    @Test
    void testProcessWithRaceInDataAvailability() throws Exception {
        try (MockEnvironment environment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)TestTaskStateManager.builder().setStateChangelogStorage(null).build()).build();){
            environment.addOutputs(Collections.singletonList(new AvailabilityTestResultPartitionWriter(true)));
            RacyTestInputProcessor inputProcessor = new RacyTestInputProcessor();
            org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)environment).setStreamInputProcessor((StreamInputProcessor)inputProcessor).build();
            task.invoke();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testProcessWithUnAvailableOutput() throws Exception {
        long sleepTimeOutsideMail = 42L;
        long sleepTimeInsideMail = 44L;
        Thread waitingThread = null;
        try (MockEnvironment environment = this.setupEnvironment(true, false);){
            int numberOfProcessCalls = 10;
            AvailabilityTestInputProcessor inputProcessor = new AvailabilityTestInputProcessor(10);
            org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)environment).setStreamInputProcessor((StreamInputProcessor)inputProcessor).build();
            MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor();
            TaskIOMetricGroup ioMetricGroup = task.getEnvironment().getMetricGroup().getIOMetricGroup();
            RunnableWithException completeFutureTask = () -> StreamTaskTest.lambda$testProcessWithUnAvailableOutput$30(inputProcessor, (StreamTask)task, environment);
            waitingThread = new WaitingThread(executor, completeFutureTask, 44L, 42L, ioMetricGroup.getSoftBackPressuredTimePerSecond());
            executor.execute(((WaitingThread)waitingThread)::start, "This task will submit another task to execute after processing input once.");
            long startTs = System.currentTimeMillis();
            task.invoke();
            long totalDuration = System.currentTimeMillis() - startTs;
            Assertions.assertThat((long)ioMetricGroup.getSoftBackPressuredTimePerSecond().getCount()).isGreaterThanOrEqualTo(42L);
            Assertions.assertThat((long)ioMetricGroup.getSoftBackPressuredTimePerSecond().getCount()).isLessThanOrEqualTo(totalDuration - 44L);
            Assertions.assertThat((long)ioMetricGroup.getIdleTimeMsPerSecond().getCount()).isZero();
            Assertions.assertThat((int)inputProcessor.currentNumProcessCalls).isEqualTo(10);
        }
        finally {
            if (waitingThread != null) {
                waitingThread.join();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testProcessWithUnAvailableInput() throws Exception {
        long sleepTimeOutsideMail = 42L;
        long sleepTimeInsideMail = 44L;
        Thread waitingThread = null;
        try (MockEnvironment environment = this.setupEnvironment(true, true);){
            UnAvailableTestInputProcessor inputProcessor = new UnAvailableTestInputProcessor();
            org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)environment).setStreamInputProcessor((StreamInputProcessor)inputProcessor).build();
            TaskIOMetricGroup ioMetricGroup = task.getEnvironment().getMetricGroup().getIOMetricGroup();
            MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor();
            RunnableWithException completeFutureTask = () -> inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);
            waitingThread = new WaitingThread(executor, completeFutureTask, 44L, 42L, ioMetricGroup.getIdleTimeMsPerSecond());
            executor.execute(((WaitingThread)waitingThread)::start, "Start WaitingThread after Task starts processing input.");
            SystemClock clock = SystemClock.getInstance();
            long startTs = clock.absoluteTimeMillis();
            task.invoke();
            long totalDuration = clock.absoluteTimeMillis() - startTs;
            Assertions.assertThat((long)ioMetricGroup.getIdleTimeMsPerSecond().getCount()).isGreaterThanOrEqualTo(42L);
            Assertions.assertThat((long)ioMetricGroup.getIdleTimeMsPerSecond().getCount()).isLessThanOrEqualTo(totalDuration - 44L);
            Assertions.assertThat((long)ioMetricGroup.getSoftBackPressuredTimePerSecond().getCount()).isZero();
            Assertions.assertThat((long)ioMetricGroup.getHardBackPressuredTimePerSecond().getCount()).isZero();
        }
        finally {
            if (waitingThread != null) {
                waitingThread.join();
            }
        }
    }

    @Test
    void testRestorePerformedOnlyOnce() throws Exception {
        OneInputStreamOperator statelessOperator = StreamTaskTest.streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        RunningTask task = StreamTaskTest.runTask(() -> {
            MockStreamTask mockStreamTask = StreamTaskTest.createMockStreamTask((Environment)dummyEnvironment, StreamTaskTest.operatorChain(statelessOperator));
            mockStreamTask.restore();
            return mockStreamTask;
        });
        StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
        ((MockStreamTask)((Object)task.streamTask)).cancel();
        Assertions.assertThat((int)((MockStreamTask)((Object)task.streamTask)).restoreInvocationCount).isOne();
    }

    @Test
    void testRestorePerformedFromInvoke() throws Exception {
        OneInputStreamOperator statelessOperator = StreamTaskTest.streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)dummyEnvironment, StreamTaskTest.operatorChain(statelessOperator)));
        StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
        ((MockStreamTask)((Object)task.streamTask)).cancel();
        Assertions.assertThat((int)((MockStreamTask)((Object)task.streamTask)).restoreInvocationCount).isOne();
    }

    @Test
    void testQuiesceOfMailboxRightBeforeSubmittingActionViaTimerService() throws Exception {
        AtomicBoolean submitThroughputFail = new AtomicBoolean();
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
        UnAvailableTestInputProcessor inputProcessor = new UnAvailableTestInputProcessor();
        RunningTask task = StreamTaskTest.runTask(() -> new MockStreamTaskBuilder((Environment)mockEnvironment).setHandleAsyncException((str, t) -> submitThroughputFail.set(true)).setStreamInputProcessor((StreamInputProcessor)inputProcessor).build());
        StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
        TimerService timerService = ((StreamTask)task.streamTask).systemTimerService;
        MailboxExecutor mainMailboxExecutor = ((StreamTask)task.streamTask).mailboxProcessor.getMainMailboxExecutor();
        CountDownLatch stoppingMailboxLatch = new CountDownLatch(1);
        timerService.registerTimer(timerService.getCurrentProcessingTime(), time -> {
            stoppingMailboxLatch.await();
            Thread.sleep(5L);
            mainMailboxExecutor.execute(() -> {}, "test");
        });
        mainMailboxExecutor.submit(() -> {
            stoppingMailboxLatch.countDown();
            task.streamTask.afterInvoke();
        }, "test").get();
        Assertions.assertThat((AtomicBoolean)submitThroughputFail).isFalse();
        inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);
    }

    @Test
    void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.set(StateBackendOptions.STATE_BACKEND, (Object)TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4712L, 43L));
        FailedSource failedSource = new FailedSource();
        cfg.setStreamOperator(new TestStreamSource(failedSource));
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(SourceStreamTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig, EXECUTOR_EXTENSION.getExecutor());
            task.startTaskThread();
            failedSource.awaitRunning();
            task.triggerCheckpointBarrier(42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation());
            task.getExecutingThread().join();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        }
    }

    @Test
    void testSkipRepeatCheckpointComplete() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).setupOutputForSingletonOperatorChain((StreamOperator<?>)new CheckpointCompleteRecordOperator()).build();){
            testHarness.streamTask.notifyCheckpointCompleteAsync(3L);
            testHarness.streamTask.notifyCheckpointAbortAsync(5L, 3L);
            testHarness.streamTask.notifyCheckpointAbortAsync(10L, 8L);
            testHarness.streamTask.notifyCheckpointCompleteAsync(8L);
            testHarness.processAll();
            CheckpointCompleteRecordOperator operator = (CheckpointCompleteRecordOperator)((AbstractStreamOperator)testHarness.streamTask.getMainOperator());
            Assertions.assertThat(operator.getNotifiedCheckpoint()).isEqualTo(Arrays.asList(3L, 8L));
        }
    }

    @Test
    void testIgnoreCompleteCheckpointBeforeStartup() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).setTaskStateSnapshot(3L, new TaskStateSnapshot()).modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).setupOutputForSingletonOperatorChain((StreamOperator<?>)new CheckpointCompleteRecordOperator()).build();){
            testHarness.streamTask.notifyCheckpointCompleteAsync(2L);
            testHarness.streamTask.notifyCheckpointAbortAsync(4L, 3L);
            testHarness.streamTask.notifyCheckpointCompleteAsync(5L);
            testHarness.streamTask.notifyCheckpointAbortAsync(7L, 6L);
            testHarness.processAll();
            CheckpointCompleteRecordOperator operator = (CheckpointCompleteRecordOperator)((AbstractStreamOperator)testHarness.streamTask.getMainOperator());
            Assertions.assertThat(operator.getNotifiedCheckpoint()).isEqualTo(Arrays.asList(5L, 6L));
        }
    }

    @Test
    void testBufferSizeRecalculationStartSuccessfully() throws Exception {
        final int expectedThroughput = 13333;
        int inputChannels = 3;
        Configuration config = new Configuration().set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, (Object)Duration.ofHours(10L)).set(TaskManagerOptions.BUFFER_DEBLOAT_TARGET, (Object)Duration.ofSeconds(1L)).set(TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES, (Object)1).set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, (Object)true);
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setTaskManagerRuntimeInfo((TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(config)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, inputChannels).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, inputChannels).modifyGateBuilder(gateBuilder -> gateBuilder.setThroughputCalculator(bufferDebloatConfiguration -> new ThroughputCalculator((Clock)SystemClock.getInstance()){

            public long calculateThroughput() {
                return expectedThroughput;
            }
        })).setupOutputForSingletonOperatorChain((StreamOperator<?>)new TestBoundedOneInputStreamOperator()).build();){
            harness.processAll();
            harness.streamTask.debloat();
            long lastBufferSize = -1L;
            for (IndexedInputGate inputGate : harness.streamTask.getEnvironment().getAllInputGates()) {
                for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {
                    long currentBufferSize = ((TestInputChannel)inputGate.getChannel(i)).getCurrentBufferSize();
                    Assertions.assertThat((long)currentBufferSize).isLessThan(((MemorySize)TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
                    Assertions.assertThat((long)currentBufferSize).isGreaterThan(0L);
                    if (lastBufferSize > 0L) {
                        Assertions.assertThat((long)lastBufferSize).isEqualTo(currentBufferSize);
                    }
                    lastBufferSize = currentBufferSize;
                }
            }
        }
    }

    @Test
    void testBufferDebloatingMultiGates() throws Exception {
        Configuration config = new Configuration().set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, (Object)Duration.ofHours(10L)).set(TaskManagerOptions.BUFFER_DEBLOAT_TARGET, (Object)Duration.ofSeconds(1L)).set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, (Object)true).set(TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES, (Object)0);
        long throughputGate1 = 1024L;
        long throughputGate2 = 61440L;
        boolean inputChannelsGate1 = true;
        int inputChannelsGate2 = 4;
        ThroughputCalculator throughputCalculator = new ThroughputCalculator((Clock)SystemClock.getInstance()){
            private int callCount;
            {
                this.callCount = 0;
            }

            public long calculateThroughput() {
                if (this.callCount++ % 2 == 0) {
                    return 1024L;
                }
                return 61440L;
            }
        };
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setTaskManagerRuntimeInfo((TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(config)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 1).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 4).modifyGateBuilder(gateBuilder -> gateBuilder.setThroughputCalculator(bufferDebloatConfiguration -> throughputCalculator)).setupOutputForSingletonOperatorChain((StreamOperator<?>)new TestBoundedOneInputStreamOperator()).build();){
            IndexedInputGate[] inputGates = harness.streamTask.getEnvironment().getAllInputGates();
            harness.processAll();
            while (this.getCurrentBufferSize((InputGate)inputGates[0]) == 0 || (long)this.getCurrentBufferSize((InputGate)inputGates[0]) > 1024L) {
                harness.streamTask.debloat();
            }
            Assertions.assertThat((int)this.getCurrentBufferSize((InputGate)inputGates[0])).isEqualTo(1024L);
            Assertions.assertThat((int)this.getCurrentBufferSize((InputGate)inputGates[1])).isEqualTo(15360L);
        }
    }

    @Test
    void testMailboxMetricsScheduling() throws Exception {
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();){
            final Gauge mailboxSizeMetric = mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxSize();
            final Histogram mailboxLatencyMetric = mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxLatency();
            final AtomicInteger maxMailboxSize = new AtomicInteger(-1);
            int minMeasurements = 2;
            SupplierWithException task = () -> new StreamTask<Object, StreamOperator<Object>>((Environment)mockEnvironment){

                protected void init() {
                    this.mailboxProcessor.getMailboxMetricsControl().setLatencyMeasurementInterval(2);
                }

                protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
                    if (mailboxLatencyMetric.getCount() < 2L) {
                        this.mailboxProcessor.getMainMailboxExecutor().execute(() -> {}, "mail");
                        Thread.sleep(1L);
                    } else {
                        controller.suspendDefaultAction();
                        this.mailboxProcessor.suspend();
                    }
                    maxMailboxSize.set(Math.max(maxMailboxSize.get(), (Integer)mailboxSizeMetric.getValue()));
                }
            };
            StreamTaskTest.runTask(task).waitForTaskCompletion(false);
            Assertions.assertThat((long)mailboxLatencyMetric.getCount()).isGreaterThanOrEqualTo(2L);
            Assertions.assertThat((AtomicInteger)maxMailboxSize).hasValueGreaterThan(0);
            Assertions.assertThat((Integer)((Integer)mailboxSizeMetric.getValue())).isZero();
        }
    }

    @Test
    void testSubTaskInitializationMetrics() throws Exception {
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>)new TestBoundedOneInputStreamOperator());
        try (StreamTaskMailboxTestHarness harness = builder.buildUnrestored();){
            harness.streamTask.restore();
            Assertions.assertThat((Optional)harness.getTaskStateManager().getReportedInitializationMetrics()).isPresent();
        }
    }

    @Test
    void testMailboxMetricsMeasurement() throws Exception {
        int numMails = 10;
        int sleepTime = 5;
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>)new TestBoundedOneInputStreamOperator());
        try (StreamTaskMailboxTestHarness harness = builder.build();){
            Histogram mailboxLatencyMetric = harness.streamTask.getEnvironment().getMetricGroup().getIOMetricGroup().getMailboxLatency();
            Gauge mailboxSizeMetric = harness.streamTask.getEnvironment().getMetricGroup().getIOMetricGroup().getMailboxSize();
            long startTime = SystemClock.getInstance().relativeTimeMillis();
            harness.streamTask.mailboxProcessor.getMailboxMetricsControl().measureMailboxLatency();
            for (int i = 0; i < 10; ++i) {
                harness.streamTask.mainMailboxExecutor.execute(() -> Thread.sleep(5L), "add value");
            }
            harness.streamTask.mailboxProcessor.getMailboxMetricsControl().measureMailboxLatency();
            Assertions.assertThat((Integer)((Integer)mailboxSizeMetric.getValue())).isGreaterThanOrEqualTo(10);
            Assertions.assertThat((long)mailboxLatencyMetric.getCount()).isZero();
            harness.processAll();
            long endTime = SystemClock.getInstance().relativeTimeMillis();
            Assertions.assertThat((Integer)((Integer)mailboxSizeMetric.getValue())).isZero();
            Assertions.assertThat((long)mailboxLatencyMetric.getCount()).isEqualTo(2L);
            Assertions.assertThat((long)mailboxLatencyMetric.getStatistics().getMax()).isBetween(Long.valueOf(50L), Long.valueOf(endTime - startTime));
        }
    }

    @Test
    void testForwardPartitionerIsConvertedToRebalanceOnParallelismChanges() throws Exception {
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setOutputPartitioner((StreamPartitioner)new ForwardPartitioner()).setupOutputForSingletonOperatorChain((StreamOperator<?>)new TestBoundedOneInputStreamOperator());
        try (StreamTaskMailboxTestHarness harness = builder.build();){
            StreamTask cfr_ignored_0 = harness.streamTask;
            RecordWriterDelegate recordWriterDelegate = StreamTask.createRecordWriterDelegate((StreamConfig)harness.streamTask.configuration, (Environment)harness.streamMockEnvironment);
            Assertions.assertThat((Object)((ChannelSelectorRecordWriter)((SingleRecordWriter)recordWriterDelegate).getRecordWriter(0)).getChannelSelector()).isInstanceOf(ForwardPartitioner.class);
            ArrayList<8> newOutputs = new ArrayList<8>();
            newOutputs.add(new MockResultPartitionWriter(){

                public int getNumberOfSubpartitions() {
                    return 2;
                }
            });
            harness.streamMockEnvironment.setOutputs(newOutputs);
            StreamTask cfr_ignored_1 = harness.streamTask;
            recordWriterDelegate = StreamTask.createRecordWriterDelegate((StreamConfig)harness.streamTask.configuration, (Environment)harness.streamMockEnvironment);
            Assertions.assertThat((Object)((ChannelSelectorRecordWriter)((SingleRecordWriter)recordWriterDelegate).getRecordWriter(0)).getChannelSelector()).isInstanceOf(RebalancePartitioner.class);
        }
    }

    private int getCurrentBufferSize(InputGate inputGate) {
        return this.getTestChannel(inputGate, 0).getCurrentBufferSize();
    }

    private TestInputChannel getTestChannel(InputGate inputGate, int idx) {
        return (TestInputChannel)inputGate.getChannel(idx);
    }

    private MockEnvironment setupEnvironment(boolean ... outputAvailabilities) {
        Configuration configuration = new Configuration();
        new MockStreamConfig(configuration, outputAvailabilities.length);
        ArrayList<AvailabilityTestResultPartitionWriter> writers = new ArrayList<AvailabilityTestResultPartitionWriter>(outputAvailabilities.length);
        for (int i = 0; i < outputAvailabilities.length; ++i) {
            writers.add(new AvailabilityTestResultPartitionWriter(outputAvailabilities[i]));
        }
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskConfiguration(configuration).build();
        environment.addOutputs(writers);
        return environment;
    }

    private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception {
        OneInputStreamOperator operator = (OneInputStreamOperator)Mockito.mock(OneInputStreamOperator.class);
        Mockito.when((Object)operator.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operator.snapshotState(Mockito.anyLong(), Mockito.anyLong(), (CheckpointOptions)Mockito.any(CheckpointOptions.class), (CheckpointStreamFactory)Mockito.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult);
        Mockito.when((Object)operator.getMetricGroup()).thenReturn((Object)UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
        return operator;
    }

    private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshotException(Exception exception) throws Exception {
        OneInputStreamOperator operator = (OneInputStreamOperator)Mockito.mock(OneInputStreamOperator.class);
        Mockito.when((Object)operator.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operator.snapshotState(Mockito.anyLong(), Mockito.anyLong(), (CheckpointOptions)Mockito.any(CheckpointOptions.class), (CheckpointStreamFactory)Mockito.any(CheckpointStreamFactory.class))).thenThrow(new Throwable[]{exception});
        Mockito.when((Object)operator.getMetricGroup()).thenReturn((Object)UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
        return operator;
    }

    private static <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T> ... streamOperators) throws Exception {
        return OperatorChainTest.setupOperatorChain(streamOperators);
    }

    private static <T extends StreamTask<?, ?>> RunningTask<T> runTask(SupplierWithException<T, Exception> taskFactory) throws Exception {
        CompletableFuture taskCreationFuture = new CompletableFuture();
        CompletableFuture<Void> invocationFuture = CompletableFuture.runAsync(() -> {
            StreamTask task;
            try {
                task = (StreamTask)taskFactory.get();
                taskCreationFuture.complete(task);
            }
            catch (Exception e) {
                taskCreationFuture.completeExceptionally(e);
                return;
            }
            try {
                task.invoke();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, Executors.newSingleThreadExecutor());
        return new RunningTask<StreamTask>((StreamTask)taskCreationFuture.get(), invocationFuture);
    }

    public static Task createTask(Class<? extends TaskInvokable> invokable, ShuffleEnvironment shuffleEnvironment, StreamConfig taskConfig, Configuration taskManagerConfig, Executor executor) throws Exception {
        taskConfig.serializeAllConfigs();
        return new TestTaskBuilder(shuffleEnvironment).setTaskManagerConfig(taskManagerConfig).setInvokable(invokable).setTaskConfig(taskConfig.getConfiguration()).build(executor);
    }

    private static MockStreamTask createMockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) throws Exception {
        return new MockStreamTask(env, operatorChain, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
    }

    private static /* synthetic */ void lambda$testProcessWithUnAvailableOutput$30(AvailabilityTestInputProcessor inputProcessor, StreamTask task, MockEnvironment environment) throws Exception {
        Assertions.assertThat((int)inputProcessor.currentNumProcessCalls).isOne();
        Assertions.assertThat((boolean)task.mailboxProcessor.isDefaultActionAvailable()).isFalse();
        environment.getWriter(1).getAvailableFuture().complete(null);
    }

    private static /* synthetic */ MockStreamTask lambda$testDecliningCheckpointStreamOperator$14(DummyEnvironment dummyEnvironment, Exception testException, OperatorSnapshotFutures operatorSnapshotResult1, OperatorSnapshotFutures operatorSnapshotResult2) throws Exception {
        return StreamTaskTest.createMockStreamTask((Environment)dummyEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshotException(testException), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult1), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult2)));
    }

    private static final class OnlyIncrementalStateBackend
    extends MockStateBackend {
        private OnlyIncrementalStateBackend() {
        }

        public boolean supportsNoClaimRestoreMode() {
            return false;
        }

        public String toString() {
            return "OnlyIncrementalStateBackend";
        }
    }

    private static class CheckpointCompleteRecordOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        private final List<Long> notifiedCheckpoint = new ArrayList<Long>();

        private CheckpointCompleteRecordOperator() {
        }

        public void processElement(StreamRecord<Integer> element) throws Exception {
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.notifiedCheckpoint.add(checkpointId);
        }

        public List<Long> getNotifiedCheckpoint() {
            return this.notifiedCheckpoint;
        }
    }

    private static class FailedSource
    extends RichParallelSourceFunction<String>
    implements CheckpointedFunction {
        private static CountDownLatch runningLatch = null;
        private volatile boolean running;

        public FailedSource() {
            runningLatch = new CountDownLatch(1);
        }

        public void open(OpenContext openContext) throws Exception {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            runningLatch.countDown();
            while (this.running) {
                try {
                    Thread.sleep(Integer.MAX_VALUE);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (runningLatch.getCount() == 0L) {
                throw new RuntimeException("source failed");
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }

        public void awaitRunning() throws InterruptedException {
            runningLatch.await();
        }
    }

    private static class FailOnNotifyCheckpointMapper<T>
    implements MapFunction<T, T>,
    CheckpointListener {
        private static final long serialVersionUID = 1L;

        private FailOnNotifyCheckpointMapper() {
        }

        public T map(T value) throws Exception {
            return value;
        }

        public void notifyCheckpointAborted(long checkpointId) {
            throw new ExpectedTestException();
        }

        public void notifyCheckpointComplete(long checkpointId) {
            throw new ExpectedTestException();
        }
    }

    private static class ClosingOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T> {
        static AtomicBoolean closed = new AtomicBoolean();
        static AtomicInteger notified = new AtomicInteger();

        private ClosingOperator() {
        }

        public void open() throws Exception {
            super.open();
            closed.set(false);
            notified.set(0);
        }

        public void close() throws Exception {
            closed.set(true);
            super.close();
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            super.notifyCheckpointComplete(checkpointId);
            notified.incrementAndGet();
        }

        public void processElement(StreamRecord<T> element) throws Exception {
        }
    }

    private static class FailingDummyEnvironment
    extends DummyEnvironment {
        final RuntimeException failingCause;

        private FailingDummyEnvironment(RuntimeException failingCause) {
            this.failingCause = failingCause;
        }

        public void declineCheckpoint(long checkpointId, CheckpointException cause) {
            throw this.failingCause;
        }

        public void failExternally(Throwable cause) {
            throw this.failingCause;
        }
    }

    private static final class BlockingRunnableFuture<V>
    implements RunnableFuture<V> {
        private final CompletableFuture<V> future = new CompletableFuture();
        private final OneShotLatch signalRunLatch = new OneShotLatch();
        private final CountDownLatch continueRunLatch;
        private final V value;

        private BlockingRunnableFuture(int parties, V value) {
            this.continueRunLatch = new CountDownLatch(parties);
            this.value = value;
        }

        @Override
        public void run() {
            this.signalRunLatch.trigger();
            this.continueRunLatch.countDown();
            try {
                this.continueRunLatch.await();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            this.future.complete(this.value);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.future.get(timeout, unit);
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    private static class AcknowledgeDummyEnvironment
    extends DummyEnvironment {
        private final CompletableFuture<Long> acknowledgeCheckpointFuture = new CompletableFuture();

        private AcknowledgeDummyEnvironment() {
        }

        public CompletableFuture<Long> getAcknowledgeCheckpointFuture() {
            return this.acknowledgeCheckpointFuture;
        }

        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
            this.acknowledgeCheckpointFuture.complete(checkpointId);
        }

        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
            this.acknowledgeCheckpointFuture.complete(checkpointId);
        }
    }

    private static class TestingOperatorStateHandle
    implements OperatorStateHandle {
        private static final long serialVersionUID = 923794934187614088L;
        private final transient CompletableFuture<Void> discardFuture = new CompletableFuture();

        private TestingOperatorStateHandle() {
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets() {
            return Collections.emptyMap();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new IOException("Cannot open input streams in testing implementation.");
        }

        public PhysicalStateHandleID getStreamStateHandleID() {
            throw new RuntimeException("Cannot return ID in testing implementation.");
        }

        public Optional<byte[]> asBytesIfInMemory() {
            return Optional.empty();
        }

        public StreamStateHandle getDelegateStateHandle() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void discardState() throws Exception {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    private static class TestingKeyedStateHandle
    implements KeyedStateHandle {
        private static final long serialVersionUID = -2473861305282291582L;
        private final transient CompletableFuture<Void> discardFuture = new CompletableFuture();
        private final StateHandleID stateHandleId = StateHandleID.randomStateHandleId();

        private TestingKeyedStateHandle() {
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public KeyGroupRange getKeyGroupRange() {
            return KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        }

        public TestingKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
            return this;
        }

        public StateHandleID getStateHandleId() {
            return this.stateHandleId;
        }

        public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) {
        }

        public void discardState() {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }

        public long getCheckpointedSize() {
            return this.getStateSize();
        }
    }

    static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>>
    extends StreamSource<OUT, SRC> {
        static AbstractKeyedStateBackend<?> keyedStateBackend;
        static OperatorStateBackend operatorStateBackend;
        static CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        static CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        public TestStreamSource(SRC sourceFunction) {
            super(sourceFunction);
        }

        public void initializeState(StateInitializationContext controller) throws Exception {
            keyedStateBackend = (AbstractKeyedStateBackend)this.getKeyedStateBackend();
            operatorStateBackend = this.getOperatorStateBackend();
            rawOperatorStateInputs = (CloseableIterable)controller.getRawOperatorStateInputs();
            rawKeyedStateInputs = (CloseableIterable)controller.getRawKeyedStateInputs();
            super.initializeState(controller);
        }
    }

    private static class TestUserCodeClassLoader
    extends ClassLoader {
        public TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }

    private static class ThreadInspectingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final long taskThreadId;
        private final ClassLoader taskClassLoader;
        private transient boolean hasTimerTriggered;

        ThreadInspectingTask(Environment env) throws Exception {
            super(env);
            Thread currentThread = Thread.currentThread();
            this.taskThreadId = currentThread.getId();
            this.taskClassLoader = currentThread.getContextClassLoader();
        }

        @Nullable
        ClassLoader getTaskClassLoader() {
            return this.taskClassLoader;
        }

        protected void init() throws Exception {
            this.checkTaskThreadInfo();
            ((AbstractStreamOperator)this.getMainOperator()).getProcessingTimeService().registerTimer(0L, new ProcessingTimeService.ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    this.checkTaskThreadInfo();
                    hasTimerTriggered = true;
                }
            });
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            this.checkTaskThreadInfo();
            if (this.hasTimerTriggered) {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            }
        }

        protected void cleanUpInternal() throws Exception {
            this.checkTaskThreadInfo();
        }

        private void checkTaskThreadInfo() {
            Thread currentThread = Thread.currentThread();
            Preconditions.checkState((this.taskThreadId == currentThread.getId() ? 1 : 0) != 0, (Object)"Task's method was called in non task thread.");
            Preconditions.checkState((this.taskClassLoader == currentThread.getContextClassLoader() ? 1 : 0) != 0, (Object)"Task's controller class loader has been changed during invocation.");
        }
    }

    public static class StateBackendTestSource
    extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
        private static volatile boolean fail;

        public StateBackendTestSource(Environment env) throws Exception {
            super(env);
        }

        protected void init() throws Exception {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (fail) {
                throw new RuntimeException();
            }
            controller.suspendDefaultAction();
            this.mailboxProcessor.suspend();
        }

        protected void cleanUpInternal() throws Exception {
        }

        public StreamTaskStateInitializer createStreamTaskStateInitializer(SubTaskInitializationMetricsBuilder initializationMetrics) {
            StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer(initializationMetrics);
            return (operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup, fraction, isUsingCustomRawKeyedState, isAsyncState) -> {
                final StreamOperatorStateContext controller = streamTaskStateManager.streamOperatorStateContext(operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup, fraction, isUsingCustomRawKeyedState, isAsyncState);
                return new StreamOperatorStateContext(){

                    public boolean isRestored() {
                        return controller.isRestored();
                    }

                    public OptionalLong getRestoredCheckpointId() {
                        return controller.getRestoredCheckpointId();
                    }

                    public OperatorStateBackend operatorStateBackend() {
                        return controller.operatorStateBackend();
                    }

                    public TypeSerializer<?> keySerializer() {
                        return controller.keySerializer();
                    }

                    public CheckpointableKeyedStateBackend<?> keyedStateBackend() {
                        return controller.keyedStateBackend();
                    }

                    public AsyncKeyedStateBackend<?> asyncKeyedStateBackend() {
                        return controller.asyncKeyedStateBackend();
                    }

                    public InternalTimeServiceManager<?> internalTimerServiceManager() {
                        InternalTimeServiceManager timeServiceManager = controller.internalTimerServiceManager();
                        return timeServiceManager != null ? (InternalTimeServiceManager)Mockito.spy((Object)timeServiceManager) : null;
                    }

                    public InternalTimeServiceManager<?> asyncInternalTimerServiceManager() {
                        InternalTimeServiceManager timeServiceManager = controller.internalTimerServiceManager();
                        return timeServiceManager != null ? (InternalTimeServiceManager)Mockito.spy((Object)timeServiceManager) : null;
                    }

                    public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
                        return this.replaceWithSpy(controller.rawOperatorStateInputs());
                    }

                    public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
                        return this.replaceWithSpy(controller.rawKeyedStateInputs());
                    }

                    public <T extends Closeable> T replaceWithSpy(T closeable) {
                        Closeable spyCloseable = (Closeable)Mockito.spy(closeable);
                        if (closeableRegistry.unregisterCloseable(closeable)) {
                            try {
                                closeableRegistry.registerCloseable((AutoCloseable)spyCloseable);
                            }
                            catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        return (T)spyCloseable;
                    }
                };
            };
        }
    }

    static class EmptyInputProcessor
    implements StreamInputProcessor {
        private volatile boolean isFinished;

        public EmptyInputProcessor() {
            this(true);
        }

        public EmptyInputProcessor(boolean startFinished) {
            this.isFinished = startFinished;
        }

        public DataInputStatus processInput() throws Exception {
            return this.isFinished ? DataInputStatus.END_OF_INPUT : DataInputStatus.NOTHING_AVAILABLE;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AVAILABLE;
        }

        public void finishInput() {
            this.isFinished = true;
        }
    }

    private static class MockStreamTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
        private int restoreInvocationCount = 0;

        MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
            super(env, null, uncaughtExceptionHandler);
            this.overrideOperatorChain = operatorChain;
        }

        public void restoreInternal() throws Exception {
            super.restoreInternal();
            ++this.restoreInvocationCount;
        }

        protected void init() {
            this.operatorChain = this.overrideOperatorChain;
            this.mainOperator = this.operatorChain.getMainOperator();
            this.inputProcessor = new EmptyInputProcessor(false);
        }

        void finishInput() {
            Preconditions.checkState((this.inputProcessor != null ? 1 : 0) != 0, (Object)"Tried to finishInput before MockStreamTask was started");
            ((EmptyInputProcessor)this.inputProcessor).finishInput();
        }
    }

    public static final class TestMemoryStateBackendFactory
    implements StateBackendFactory<AbstractStateBackend> {
        private static final long serialVersionUID = 1L;

        public AbstractStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) {
            return new TestSpyWrapperStateBackend(this.createInnerBackend(config));
        }

        protected HashMapStateBackend createInnerBackend(ReadableConfig config) {
            return new HashMapStateBackend();
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    private static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, Output<StreamRecord<Long>> collector, OperatorChain<?, ?> operatorChain) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }

    private static class RacyTestInputProcessor
    implements StreamInputProcessor {
        private boolean firstCall = true;

        private RacyTestInputProcessor() {
        }

        public DataInputStatus processInput() {
            try {
                DataInputStatus dataInputStatus = this.firstCall ? DataInputStatus.NOTHING_AVAILABLE : DataInputStatus.END_OF_INPUT;
                return dataInputStatus;
            }
            finally {
                this.firstCall = false;
            }
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AvailabilityProvider.AVAILABLE;
        }
    }

    private static class UnAvailableTestInputProcessor
    implements StreamInputProcessor {
        private final AvailabilityProvider.AvailabilityHelper availabilityProvider = new AvailabilityProvider.AvailabilityHelper();

        private UnAvailableTestInputProcessor() {
        }

        public DataInputStatus processInput() {
            return this.availabilityProvider.isAvailable() ? DataInputStatus.END_OF_INPUT : DataInputStatus.NOTHING_AVAILABLE;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return this.availabilityProvider.getAvailableFuture();
        }
    }

    private static class AvailabilityTestInputProcessor
    implements StreamInputProcessor {
        private final int totalProcessCalls;
        private int currentNumProcessCalls;

        AvailabilityTestInputProcessor(int totalProcessCalls) {
            this.totalProcessCalls = totalProcessCalls;
        }

        public DataInputStatus processInput() {
            return ++this.currentNumProcessCalls < this.totalProcessCalls ? DataInputStatus.MORE_AVAILABLE : DataInputStatus.END_OF_INPUT;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AVAILABLE;
        }
    }

    private static class RunningTask<T extends StreamTask<?, ?>> {
        final T streamTask;
        final CompletableFuture<Void> invocationFuture;

        RunningTask(T streamTask, CompletableFuture<Void> invocationFuture) {
            this.streamTask = streamTask;
            this.invocationFuture = invocationFuture;
        }

        void waitForTaskCompletion(boolean cancelled) throws Exception {
            try {
                this.invocationFuture.get();
            }
            catch (Exception e) {
                if (cancelled) {
                    Assertions.assertThat((Throwable)e).hasCauseInstanceOf(CancelTaskException.class);
                }
                throw e;
            }
            Assertions.assertThat((boolean)this.streamTask.isCanceled()).isEqualTo(cancelled);
        }
    }

    private static class WaitingThread
    extends Thread {
        private final MailboxExecutor executor;
        private final RunnableWithException resumeTask;
        private final long sleepTimeInsideMail;
        private final long sleepTimeOutsideMail;
        private final TimerGauge sleepOutsideMailTimer;
        @Nullable
        private Exception asyncException;

        public WaitingThread(MailboxExecutor executor, RunnableWithException resumeTask, long sleepTimeInsideMail, long sleepTimeOutsideMail, TimerGauge sleepOutsideMailTimer) {
            this.executor = executor;
            this.resumeTask = resumeTask;
            this.sleepTimeInsideMail = sleepTimeInsideMail;
            this.sleepTimeOutsideMail = sleepTimeOutsideMail;
            this.sleepOutsideMailTimer = sleepOutsideMailTimer;
        }

        @Override
        public void run() {
            try {
                while (!this.sleepOutsideMailTimer.isMeasuring()) {
                    Thread.sleep(1L);
                }
                Thread.sleep(this.sleepTimeOutsideMail);
            }
            catch (InterruptedException e) {
                this.asyncException = e;
            }
            this.executor.execute(() -> {
                if (this.asyncException != null) {
                    throw this.asyncException;
                }
                Thread.sleep(this.sleepTimeInsideMail);
                this.resumeTask.run();
            }, "This task will complete the future to resume process input action.");
        }
    }

    private static class FailingTwiceOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;

        private FailingTwiceOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            throw new ExpectedTestException();
        }

        public void close() throws Exception {
            throw new CloseException();
        }

        static class CloseException
        extends Exception {
            public CloseException() {
                super("Close Exception. This exception should be suppressed");
            }
        }
    }
}

