package org.apache.flink.api.common.io;

import java.io.IOException;
import java.lang.Thread;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/api/common/io/OutputFormatBaseTest.class */
class OutputFormatBaseTest {
    private static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/api/common/io/OutputFormatBaseTest$TestOutputFormat.class */
    public static class TestOutputFormat extends OutputFormatBase<String, Void> implements AutoCloseable {
        private static final long serialVersionUID = 6646648756749403023L;
        private final Queue<CompletionStage<Void>> tasksQueue;

        @Nullable
        private final Function<String, CompletionStage<Void>> sendFunction;

        private TestOutputFormat(int i, Duration duration) {
            super(i, duration);
            this.tasksQueue = new LinkedList();
            this.sendFunction = null;
        }

        private TestOutputFormat(int i, Duration duration, Function<String, CompletionStage<Void>> function) {
            super(i, duration);
            this.tasksQueue = new LinkedList();
            this.sendFunction = function;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public CompletionStage<Void> send(String str) {
            return this.sendFunction == null ? this.tasksQueue.poll() : this.sendFunction.apply(str);
        }

        void enqueueCompletableFuture(CompletableFuture<Void> completableFuture) {
            Preconditions.checkNotNull(completableFuture);
            this.tasksQueue.offer(completableFuture);
        }

        public void configure(Configuration configuration) {
        }
    }

    OutputFormatBaseTest() {
    }

    @Test
    void testSuccessfulWrite() throws Exception {
        TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat();
        Throwable th = null;
        try {
            createOpenedTestOutputFormat.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
            int availablePermits = createOpenedTestOutputFormat.getAvailablePermits();
            Assertions.assertThat(availablePermits).isGreaterThan(0);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            createOpenedTestOutputFormat.writeRecord("hello");
            Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(availablePermits);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            if (createOpenedTestOutputFormat != null) {
                if (0 == 0) {
                    createOpenedTestOutputFormat.close();
                    return;
                }
                try {
                    createOpenedTestOutputFormat.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createOpenedTestOutputFormat != null) {
                if (0 != 0) {
                    try {
                        createOpenedTestOutputFormat.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createOpenedTestOutputFormat.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testThrowErrorOnClose() throws Exception {
        TestOutputFormat createTestOutputFormat = createTestOutputFormat();
        createTestOutputFormat.open(1, 1);
        RuntimeException runtimeException = new RuntimeException();
        createTestOutputFormat.enqueueCompletableFuture(FutureUtils.completedExceptionally(runtimeException));
        createTestOutputFormat.writeRecord("none");
        Assertions.assertThatThrownBy(() -> {
            createTestOutputFormat.close();
        }).isInstanceOf(IOException.class).hasCauseReference(runtimeException);
    }

    @Test
    void testThrowErrorOnWrite() throws Exception {
        TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat();
        Throwable th = null;
        try {
            RuntimeException runtimeException = new RuntimeException();
            createOpenedTestOutputFormat.enqueueCompletableFuture(FutureUtils.completedExceptionally(runtimeException));
            createOpenedTestOutputFormat.writeRecord("none");
            Assertions.assertThatThrownBy(() -> {
                createOpenedTestOutputFormat.writeRecord("none");
            }, "Sending of second value should have failed.", new Object[0]).isInstanceOf(IOException.class).hasCauseReference(runtimeException);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            if (createOpenedTestOutputFormat != null) {
                if (0 == 0) {
                    createOpenedTestOutputFormat.close();
                    return;
                }
                try {
                    createOpenedTestOutputFormat.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createOpenedTestOutputFormat != null) {
                if (0 != 0) {
                    try {
                        createOpenedTestOutputFormat.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createOpenedTestOutputFormat.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWaitForPendingUpdatesOnClose() throws Exception {
        final TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat();
        Throwable th = null;
        try {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            createOpenedTestOutputFormat.enqueueCompletableFuture(completableFuture);
            createOpenedTestOutputFormat.writeRecord("hello");
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(1);
            CheckedThread checkedThread = new CheckedThread("Flink-OutputFormatBaseTest") { // from class: org.apache.flink.api.common.io.OutputFormatBaseTest.1
                public void go() throws Exception {
                    createOpenedTestOutputFormat.close();
                }
            };
            checkedThread.start();
            while (checkedThread.getState() != Thread.State.TIMED_WAITING) {
                Thread.sleep(5L);
            }
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(1);
            completableFuture.complete(null);
            checkedThread.sync();
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            if (createOpenedTestOutputFormat != null) {
                if (0 == 0) {
                    createOpenedTestOutputFormat.close();
                    return;
                }
                try {
                    createOpenedTestOutputFormat.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createOpenedTestOutputFormat != null) {
                if (0 != 0) {
                    try {
                        createOpenedTestOutputFormat.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createOpenedTestOutputFormat.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testReleaseOnSuccess() throws Exception {
        TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat();
        Throwable th = null;
        try {
            Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(1);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            createOpenedTestOutputFormat.enqueueCompletableFuture(completableFuture);
            createOpenedTestOutputFormat.writeRecord("hello");
            Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(0);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(1);
            completableFuture.complete(null);
            Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(1);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            if (createOpenedTestOutputFormat != null) {
                if (0 == 0) {
                    createOpenedTestOutputFormat.close();
                    return;
                }
                try {
                    createOpenedTestOutputFormat.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createOpenedTestOutputFormat != null) {
                if (0 != 0) {
                    try {
                        createOpenedTestOutputFormat.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createOpenedTestOutputFormat.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testReleaseOnFailure() throws Exception {
        TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat();
        Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(1);
        Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        createOpenedTestOutputFormat.enqueueCompletableFuture(completableFuture);
        createOpenedTestOutputFormat.writeRecord("none");
        Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(0);
        Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(1);
        completableFuture.completeExceptionally(new RuntimeException());
        Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(1);
        Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
        Assertions.assertThatThrownBy(() -> {
            createOpenedTestOutputFormat.close();
        });
    }

    @Test
    void testReleaseOnThrowingSend() throws Exception {
        TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat((Function<String, CompletionStage<Void>>) str -> {
            throw new RuntimeException("expected");
        });
        Throwable th = null;
        try {
            Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(1);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            try {
                createOpenedTestOutputFormat.writeRecord("none");
            } catch (RuntimeException e) {
            }
            Assertions.assertThat(createOpenedTestOutputFormat.getAvailablePermits()).isEqualTo(1);
            Assertions.assertThat(createOpenedTestOutputFormat.getAcquiredPermits()).isEqualTo(0);
            if (createOpenedTestOutputFormat != null) {
                if (0 == 0) {
                    createOpenedTestOutputFormat.close();
                    return;
                }
                try {
                    createOpenedTestOutputFormat.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createOpenedTestOutputFormat != null) {
                if (0 != 0) {
                    try {
                        createOpenedTestOutputFormat.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createOpenedTestOutputFormat.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testMaxConcurrentRequestsReached() throws Exception {
        TestOutputFormat createOpenedTestOutputFormat = createOpenedTestOutputFormat(Duration.ofMillis(1L));
        Throwable th = null;
        try {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            createOpenedTestOutputFormat.enqueueCompletableFuture(completableFuture);
            createOpenedTestOutputFormat.enqueueCompletableFuture(completableFuture);
            createOpenedTestOutputFormat.writeRecord("writeRecord #1");
            Assertions.assertThatThrownBy(() -> {
                createOpenedTestOutputFormat.writeRecord("writeRecord #2");
            }, "Sending value should have experienced a TimeoutException.", new Object[0]).hasCauseInstanceOf(TimeoutException.class);
            completableFuture.complete(null);
            if (createOpenedTestOutputFormat != null) {
                if (0 == 0) {
                    createOpenedTestOutputFormat.close();
                    return;
                }
                try {
                    createOpenedTestOutputFormat.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createOpenedTestOutputFormat != null) {
                if (0 != 0) {
                    try {
                        createOpenedTestOutputFormat.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createOpenedTestOutputFormat.close();
                }
            }
            throw th3;
        }
    }

    private static TestOutputFormat createTestOutputFormat() {
        TestOutputFormat testOutputFormat = new TestOutputFormat(1, DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT);
        testOutputFormat.configure(new Configuration());
        return testOutputFormat;
    }

    private static TestOutputFormat createOpenedTestOutputFormat() {
        return createOpenedTestOutputFormat(DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT);
    }

    private static TestOutputFormat createOpenedTestOutputFormat(Duration duration) {
        TestOutputFormat testOutputFormat = new TestOutputFormat(1, duration);
        testOutputFormat.configure(new Configuration());
        testOutputFormat.open(1, 1);
        return testOutputFormat;
    }

    private static TestOutputFormat createOpenedTestOutputFormat(Function<String, CompletionStage<Void>> function) {
        TestOutputFormat testOutputFormat = new TestOutputFormat(1, DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT, function);
        testOutputFormat.configure(new Configuration());
        testOutputFormat.open(1, 1);
        return testOutputFormat;
    }
}
