package org.apache.flink.table.gateway.rest;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.table.gateway.rest.util.RestConfigUtils;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/table/gateway/rest/RestAPIITTestBase.class */
public abstract class RestAPIITTestBase {

    @Order(1)
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();

    @Order(2)
    @RegisterExtension
    protected static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION;

    @Nullable
    private static RestClient restClient;

    @Nullable
    private static String targetAddress;

    @Nullable
    private static ExecutorService executorService;

    @Nullable
    private static SqlGatewayRestEndpoint sqlGatewayRestEndpoint;
    private static int port;

    @BeforeAll
    static void start() throws Exception {
        String hostAddress = InetAddress.getLoopbackAddress().getHostAddress();
        sqlGatewayRestEndpoint = new SqlGatewayRestEndpoint(RestConfigUtils.getBaseConfig(RestConfigUtils.getFlinkConfig(hostAddress, hostAddress, "0")), SQL_GATEWAY_SERVICE_EXTENSION.getService());
        sqlGatewayRestEndpoint.start();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) Preconditions.checkNotNull(sqlGatewayRestEndpoint.getServerAddress());
        executorService = Executors.newFixedThreadPool(1, new ExecutorThreadFactory("rest-client-thread-pool"));
        restClient = new RestClient(new Configuration(), executorService);
        targetAddress = inetSocketAddress.getHostName();
        port = inetSocketAddress.getPort();
    }

    @AfterAll
    static void stop() throws Exception {
        Preconditions.checkNotNull(sqlGatewayRestEndpoint);
        sqlGatewayRestEndpoint.close();
        Preconditions.checkNotNull(restClient);
        restClient.shutdown(Time.seconds(3L));
        Preconditions.checkNotNull(executorService);
        ExecutorUtils.gracefulShutdown(3L, TimeUnit.SECONDS, new ExecutorService[]{executorService});
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u, R r) throws IOException {
        Preconditions.checkNotNull(restClient);
        return restClient.sendRequest(targetAddress, port, m, u, r);
    }

    static {
        MiniClusterExtension miniClusterExtension = MINI_CLUSTER;
        miniClusterExtension.getClass();
        SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(miniClusterExtension::getClientConfiguration);
        restClient = null;
        targetAddress = null;
        executorService = null;
        sqlGatewayRestEndpoint = null;
        port = 0;
    }
}
