/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.client;

import io.opentelemetry.testing.internal.armeria.client.ConnectionPoolListener;
import io.opentelemetry.testing.internal.armeria.client.Endpoint;
import io.opentelemetry.testing.internal.armeria.client.HAProxyHandler;
import io.opentelemetry.testing.internal.armeria.client.HttpClientFactory;
import io.opentelemetry.testing.internal.armeria.client.HttpClientPipelineConfigurator;
import io.opentelemetry.testing.internal.armeria.client.HttpSessionHandler;
import io.opentelemetry.testing.internal.armeria.client.RefusedStreamException;
import io.opentelemetry.testing.internal.armeria.client.SessionProtocolNegotiationCache;
import io.opentelemetry.testing.internal.armeria.client.SessionProtocolNegotiationException;
import io.opentelemetry.testing.internal.armeria.client.UnprocessedRequestException;
import io.opentelemetry.testing.internal.armeria.client.proxy.ConnectProxyConfig;
import io.opentelemetry.testing.internal.armeria.client.proxy.HAProxyConfig;
import io.opentelemetry.testing.internal.armeria.client.proxy.ProxyConfig;
import io.opentelemetry.testing.internal.armeria.client.proxy.ProxyConfigSelector;
import io.opentelemetry.testing.internal.armeria.client.proxy.ProxyType;
import io.opentelemetry.testing.internal.armeria.client.proxy.Socks4ProxyConfig;
import io.opentelemetry.testing.internal.armeria.client.proxy.Socks5ProxyConfig;
import io.opentelemetry.testing.internal.armeria.common.ClosedSessionException;
import io.opentelemetry.testing.internal.armeria.common.SerializationFormat;
import io.opentelemetry.testing.internal.armeria.common.SessionProtocol;
import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.common.logging.ClientConnectionTimingsBuilder;
import io.opentelemetry.testing.internal.armeria.common.util.AsyncCloseable;
import io.opentelemetry.testing.internal.armeria.common.util.AsyncCloseableSupport;
import io.opentelemetry.testing.internal.armeria.common.util.DomainSocketAddress;
import io.opentelemetry.testing.internal.armeria.internal.client.HttpSession;
import io.opentelemetry.testing.internal.armeria.internal.client.PooledChannel;
import io.opentelemetry.testing.internal.armeria.internal.common.util.ChannelUtil;
import io.opentelemetry.testing.internal.armeria.internal.common.util.TemporaryThreadLocals;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableSet;
import io.opentelemetry.testing.internal.io.netty.bootstrap.Bootstrap;
import io.opentelemetry.testing.internal.io.netty.channel.Channel;
import io.opentelemetry.testing.internal.io.netty.channel.ChannelFuture;
import io.opentelemetry.testing.internal.io.netty.channel.ChannelFutureListener;
import io.opentelemetry.testing.internal.io.netty.channel.ChannelInitializer;
import io.opentelemetry.testing.internal.io.netty.channel.ChannelOption;
import io.opentelemetry.testing.internal.io.netty.channel.EventLoop;
import io.opentelemetry.testing.internal.io.netty.handler.proxy.HttpProxyHandler;
import io.opentelemetry.testing.internal.io.netty.handler.proxy.ProxyConnectException;
import io.opentelemetry.testing.internal.io.netty.handler.proxy.ProxyHandler;
import io.opentelemetry.testing.internal.io.netty.handler.proxy.Socks4ProxyHandler;
import io.opentelemetry.testing.internal.io.netty.handler.proxy.Socks5ProxyHandler;
import io.opentelemetry.testing.internal.io.netty.handler.ssl.SslContext;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.Future;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.GenericFutureListener;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.Promise;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.ScheduledFuture;
import java.lang.reflect.Array;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.NonBlocking;

final class HttpChannelPool
implements AsyncCloseable {
    private static final Logger logger = LoggerFactory.getLogger(HttpChannelPool.class);
    private static final Channel[] EMPTY_CHANNELS = new Channel[0];
    private final HttpClientFactory clientFactory;
    private final EventLoop eventLoop;
    private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);
    private final Map<PoolKey, Deque<PooledChannel>>[] pool;
    private final Map<PoolKey, ChannelAcquisitionFuture>[] pendingAcquisitions;
    private final Map<Channel, Boolean> allChannels;
    private final ConnectionPoolListener listener;
    private final Bootstrap[][] inetBootstraps;
    @Nullable
    private final Bootstrap[][] unixBootstraps;
    private final int connectTimeoutMillis;
    private final SslContext sslCtxHttp1Or2;
    private final SslContext sslCtxHttp1Only;

    HttpChannelPool(HttpClientFactory clientFactory, EventLoop eventLoop, SslContext sslCtxHttp1Or2, SslContext sslCtxHttp1Only, ConnectionPoolListener listener) {
        this.clientFactory = clientFactory;
        this.eventLoop = eventLoop;
        this.pool = HttpChannelPool.newEnumMap(ImmutableSet.of(SessionProtocol.H1, SessionProtocol.H1C, SessionProtocol.H2, SessionProtocol.H2C));
        this.pendingAcquisitions = HttpChannelPool.newEnumMap(SessionProtocol.httpAndHttpsValues());
        this.allChannels = new IdentityHashMap<Channel, Boolean>();
        this.listener = listener;
        this.sslCtxHttp1Only = sslCtxHttp1Only;
        this.sslCtxHttp1Or2 = sslCtxHttp1Or2;
        Bootstrap inetBaseBootstrap = clientFactory.newInetBootstrap();
        this.inetBootstraps = this.newBootstrapMap(inetBaseBootstrap, clientFactory, eventLoop);
        Bootstrap unixBaseBootstrap = clientFactory.newUnixBootstrap();
        this.unixBootstraps = unixBaseBootstrap != null ? this.newBootstrapMap(unixBaseBootstrap, clientFactory, eventLoop) : null;
        this.connectTimeoutMillis = (Integer)inetBaseBootstrap.config().options().get(ChannelOption.CONNECT_TIMEOUT_MILLIS);
    }

    private Bootstrap[][] newBootstrapMap(Bootstrap baseBootstrap, HttpClientFactory clientFactory, EventLoop eventLoop) {
        baseBootstrap.group(eventLoop);
        Set<SessionProtocol> sessionProtocols = SessionProtocol.httpAndHttpsValues();
        Bootstrap[][] maps = (Bootstrap[][])Array.newInstance(Bootstrap.class, SessionProtocol.values().length, 2);
        for (SessionProtocol p : sessionProtocols) {
            SslContext sslCtx = this.determineSslContext(p);
            HttpChannelPool.setBootstrap(baseBootstrap.clone(), clientFactory, maps, p, sslCtx, true);
            HttpChannelPool.setBootstrap(baseBootstrap.clone(), clientFactory, maps, p, sslCtx, false);
        }
        return maps;
    }

    private static void setBootstrap(Bootstrap bootstrap, final HttpClientFactory clientFactory, Bootstrap[][] maps, final SessionProtocol p, final SslContext sslCtx, final boolean webSocket) {
        bootstrap.handler(new ChannelInitializer<Channel>(){

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new HttpClientPipelineConfigurator(clientFactory, webSocket, p, sslCtx));
            }
        });
        maps[p.ordinal()][HttpChannelPool.toIndex((boolean)webSocket)] = bootstrap;
    }

    private static int toIndex(boolean webSocket) {
        return webSocket ? 1 : 0;
    }

    private static int toIndex(SerializationFormat serializationFormat) {
        return HttpChannelPool.toIndex(serializationFormat == SerializationFormat.WS);
    }

    private SslContext determineSslContext(SessionProtocol desiredProtocol) {
        return desiredProtocol == SessionProtocol.H1 || desiredProtocol == SessionProtocol.H1C ? this.sslCtxHttp1Only : this.sslCtxHttp1Or2;
    }

    private void configureProxy(Channel ch, ProxyConfig proxyConfig, SessionProtocol desiredProtocol) {
        ProxyHandler proxyHandler;
        if (proxyConfig.proxyType() == ProxyType.DIRECT) {
            return;
        }
        InetSocketAddress proxyAddress = proxyConfig.proxyAddress();
        assert (proxyAddress != null);
        switch (proxyConfig.proxyType()) {
            case SOCKS4: {
                Socks4ProxyConfig socks4ProxyConfig = (Socks4ProxyConfig)proxyConfig;
                proxyHandler = new Socks4ProxyHandler(proxyAddress, socks4ProxyConfig.username());
                break;
            }
            case SOCKS5: {
                Socks5ProxyConfig socks5ProxyConfig = (Socks5ProxyConfig)proxyConfig;
                proxyHandler = new Socks5ProxyHandler(proxyAddress, socks5ProxyConfig.username(), socks5ProxyConfig.password());
                break;
            }
            case CONNECT: {
                ConnectProxyConfig connectProxyConfig = (ConnectProxyConfig)proxyConfig;
                String username = connectProxyConfig.username();
                String password = connectProxyConfig.password();
                if (username == null || password == null) {
                    proxyHandler = new HttpProxyHandler(proxyAddress);
                    break;
                }
                proxyHandler = new HttpProxyHandler((SocketAddress)proxyAddress, username, password);
                break;
            }
            case HAPROXY: {
                ch.pipeline().addFirst(new HAProxyHandler((HAProxyConfig)proxyConfig));
                return;
            }
            default: {
                throw new Error();
            }
        }
        proxyHandler.setConnectTimeoutMillis(this.connectTimeoutMillis);
        ch.pipeline().addFirst(proxyHandler);
        if (proxyConfig instanceof ConnectProxyConfig && ((ConnectProxyConfig)proxyConfig).useTls()) {
            SslContext sslCtx = this.determineSslContext(desiredProtocol);
            ch.pipeline().addFirst(sslCtx.newHandler(ch.alloc()));
        }
    }

    private static <T> Map<PoolKey, T>[] newEnumMap(Set<SessionProtocol> allowedProtocols) {
        Map[] maps = (Map[])Array.newInstance(Map.class, SessionProtocol.values().length);
        for (SessionProtocol p : allowedProtocols) {
            maps[p.ordinal()] = new HashMap();
        }
        return maps;
    }

    private Bootstrap getBootstrap(SessionProtocol desiredProtocol, SocketAddress remoteAddress, SerializationFormat serializationFormat) {
        if (remoteAddress instanceof InetSocketAddress) {
            return this.inetBootstraps[desiredProtocol.ordinal()][HttpChannelPool.toIndex(serializationFormat)];
        }
        assert (remoteAddress instanceof io.opentelemetry.testing.internal.io.netty.channel.unix.DomainSocketAddress) : remoteAddress;
        if (this.unixBootstraps == null) {
            throw new IllegalArgumentException("Domain sockets are not supported by " + this.eventLoop.getClass().getName());
        }
        return this.unixBootstraps[desiredProtocol.ordinal()][HttpChannelPool.toIndex(serializationFormat)];
    }

    @Nullable
    private Deque<PooledChannel> getPool(SessionProtocol protocol, PoolKey key) {
        return this.pool[protocol.ordinal()].get(key);
    }

    private Deque<PooledChannel> getOrCreatePool(SessionProtocol protocol, PoolKey key) {
        return this.pool[protocol.ordinal()].computeIfAbsent(key, k -> new ArrayDeque());
    }

    @Nullable
    private ChannelAcquisitionFuture getPendingAcquisition(SessionProtocol desiredProtocol, PoolKey key) {
        return this.pendingAcquisitions[desiredProtocol.ordinal()].get(key);
    }

    private void setPendingAcquisition(SessionProtocol desiredProtocol, PoolKey key, ChannelAcquisitionFuture future) {
        this.pendingAcquisitions[desiredProtocol.ordinal()].put(key, future);
    }

    private void removePendingAcquisition(SessionProtocol desiredProtocol, PoolKey key) {
        this.pendingAcquisitions[desiredProtocol.ordinal()].remove(key);
    }

    @Nullable
    PooledChannel acquireNow(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey key) {
        PooledChannel ch;
        switch (desiredProtocol) {
            case HTTP: {
                ch = this.acquireNowExact(key, SessionProtocol.H2C, serializationFormat);
                if (ch != null) break;
                ch = this.acquireNowExact(key, SessionProtocol.H1C, serializationFormat);
                break;
            }
            case HTTPS: {
                ch = this.acquireNowExact(key, SessionProtocol.H2, serializationFormat);
                if (ch != null) break;
                ch = this.acquireNowExact(key, SessionProtocol.H1, serializationFormat);
                break;
            }
            default: {
                ch = this.acquireNowExact(key, desiredProtocol, serializationFormat);
            }
        }
        return ch;
    }

    @Nullable
    private PooledChannel acquireNowExact(PoolKey key, SessionProtocol protocol, SerializationFormat serializationFormat) {
        if (serializationFormat.requiresNewConnection(protocol)) {
            return null;
        }
        Deque<PooledChannel> queue = this.getPool(protocol, key);
        if (queue == null) {
            return null;
        }
        for (int i = queue.size(); i > 0; --i) {
            PooledChannel pooledChannel = queue.peekLast();
            assert (pooledChannel != null);
            if (!HttpChannelPool.isHealthy(pooledChannel)) {
                queue.removeLast();
                continue;
            }
            HttpSession session = HttpSession.get(pooledChannel.get());
            if (!session.incrementNumUnfinishedResponses()) {
                queue.removeLast();
                queue.addFirst(pooledChannel);
                continue;
            }
            if (!protocol.isMultiplex()) {
                queue.removeLast();
            }
            return pooledChannel;
        }
        return null;
    }

    private static boolean isHealthy(PooledChannel pooledChannel) {
        Channel ch = pooledChannel.get();
        return ch.isActive() && HttpSession.get(ch).isAcquirable();
    }

    @Nullable
    private static SessionProtocol getProtocolIfHealthy(Channel ch) {
        if (!ch.isActive()) {
            return null;
        }
        return HttpSession.get(ch).protocol();
    }

    CompletableFuture<PooledChannel> acquireLater(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey key, ClientConnectionTimingsBuilder timingsBuilder) {
        ChannelAcquisitionFuture promise = new ChannelAcquisitionFuture();
        if (!this.usePendingAcquisition(desiredProtocol, serializationFormat, key, promise, timingsBuilder)) {
            this.connect(desiredProtocol, serializationFormat, key, promise, timingsBuilder);
        }
        return promise;
    }

    private boolean usePendingAcquisition(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey key, ChannelAcquisitionFuture promise, ClientConnectionTimingsBuilder timingsBuilder) {
        if (desiredProtocol.isExplicitHttp1()) {
            return false;
        }
        ChannelAcquisitionFuture pendingAcquisition = this.getPendingAcquisition(desiredProtocol, key);
        if (pendingAcquisition == null) {
            return false;
        }
        timingsBuilder.pendingAcquisitionStart();
        pendingAcquisition.piggyback(desiredProtocol, serializationFormat, key, promise, timingsBuilder);
        return true;
    }

    private void connect(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey key, ChannelAcquisitionFuture promise, ClientConnectionTimingsBuilder timingsBuilder) {
        this.setPendingAcquisition(desiredProtocol, key, promise);
        timingsBuilder.socketConnectStart();
        SocketAddress remoteAddress = key.toRemoteAddress();
        if (SessionProtocolNegotiationCache.isUnsupported(remoteAddress, desiredProtocol)) {
            this.notifyConnect(desiredProtocol, key, this.eventLoop.newFailedFuture(new SessionProtocolNegotiationException(desiredProtocol, "previously failed negotiation")), promise, timingsBuilder);
            return;
        }
        Promise<Channel> sessionPromise = this.eventLoop.newPromise();
        this.connect(remoteAddress, desiredProtocol, serializationFormat, key, sessionPromise);
        if (sessionPromise.isDone()) {
            this.notifyConnect(desiredProtocol, key, sessionPromise, promise, timingsBuilder);
        } else {
            sessionPromise.addListener(future -> this.notifyConnect(desiredProtocol, key, future, promise, timingsBuilder));
        }
    }

    void connect(SocketAddress remoteAddress, SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey poolKey, Promise<Channel> sessionPromise) {
        Bootstrap bootstrap;
        try {
            bootstrap = this.getBootstrap(desiredProtocol, remoteAddress, serializationFormat);
        }
        catch (Exception e) {
            sessionPromise.tryFailure(e);
            return;
        }
        bootstrap.register().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<ChannelFuture>)registerFuture -> {
            if (!registerFuture.isSuccess()) {
                sessionPromise.tryFailure(registerFuture.cause());
                return;
            }
            try {
                Channel channel = registerFuture.channel();
                this.configureProxy(channel, poolKey.proxyConfig, desiredProtocol);
                this.clientFactory.channelPipelineCustomizer().accept(channel.pipeline());
                channel.connect(remoteAddress).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<ChannelFuture>)connectFuture -> {
                    if (connectFuture.isSuccess()) {
                        this.initSession(desiredProtocol, serializationFormat, poolKey, (ChannelFuture)connectFuture, sessionPromise);
                    } else {
                        this.maybeHandleProxyFailure(desiredProtocol, poolKey, connectFuture.cause());
                        sessionPromise.tryFailure(connectFuture.cause());
                    }
                }));
            }
            catch (Throwable cause) {
                this.maybeHandleProxyFailure(desiredProtocol, poolKey, cause);
                sessionPromise.tryFailure(cause);
            }
        }));
    }

    int numConnections() {
        return this.allChannels.size();
    }

    void maybeHandleProxyFailure(SessionProtocol protocol, PoolKey poolKey, Throwable cause) {
        try {
            ProxyConfig proxyConfig = poolKey.proxyConfig;
            if (proxyConfig.proxyType() != ProxyType.DIRECT) {
                InetSocketAddress proxyAddress = proxyConfig.proxyAddress();
                assert (proxyAddress != null);
                ProxyConfigSelector proxyConfigSelector = this.clientFactory.proxyConfigSelector();
                proxyConfigSelector.connectFailed(protocol, poolKey.endpoint, proxyAddress, UnprocessedRequestException.of(cause));
            }
        }
        catch (Throwable t) {
            logger.warn("Exception while invoking {}.connectFailed() for {}", new Object[]{ProxyConfigSelector.class.getSimpleName(), poolKey, t});
        }
    }

    private void initSession(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey poolKey, ChannelFuture connectFuture, Promise<Channel> sessionPromise) {
        assert (connectFuture.isSuccess());
        Channel ch = connectFuture.channel();
        EventLoop eventLoop = ch.eventLoop();
        assert (eventLoop.inEventLoop());
        ScheduledFuture<?> timeoutFuture = eventLoop.schedule(() -> {
            if (sessionPromise.tryFailure(new SessionProtocolNegotiationException(desiredProtocol, "connection established, but session creation timed out: " + ch))) {
                ch.close();
            }
        }, (long)this.connectTimeoutMillis, TimeUnit.MILLISECONDS);
        ch.pipeline().addLast(new HttpSessionHandler(this, ch, sessionPromise, timeoutFuture, desiredProtocol, serializationFormat, poolKey, this.clientFactory));
    }

    private void notifyConnect(SessionProtocol desiredProtocol, PoolKey key, Future<Channel> future, ChannelAcquisitionFuture promise, ClientConnectionTimingsBuilder timingsBuilder) {
        block14: {
            assert (future.isDone());
            this.removePendingAcquisition(desiredProtocol, key);
            timingsBuilder.socketConnectEnd();
            try {
                if (future.isSuccess()) {
                    InetSocketAddress localAddr;
                    InetSocketAddress remoteAddr;
                    SessionProtocol protocol;
                    Channel channel;
                    block13: {
                        channel = future.getNow();
                        protocol = HttpChannelPool.getProtocolIfHealthy(channel);
                        if (protocol == null || this.closeable.isClosing()) {
                            channel.close();
                            promise.completeExceptionally(UnprocessedRequestException.of(new ClosedSessionException("acquired an unhealthy connection")));
                            return;
                        }
                        this.allChannels.put(channel, Boolean.TRUE);
                        remoteAddr = ChannelUtil.remoteAddress(channel);
                        localAddr = ChannelUtil.localAddress(channel);
                        assert (remoteAddr != null && localAddr != null) : "raddr: " + remoteAddr + ", laddr: " + localAddr;
                        try {
                            this.listener.connectionOpen(protocol, remoteAddr, localAddr, channel);
                        }
                        catch (Exception e) {
                            if (!logger.isWarnEnabled()) break block13;
                            logger.warn("{} Exception handling {}.connectionOpen()", new Object[]{channel, this.listener.getClass().getName(), e});
                        }
                    }
                    HttpSession session = HttpSession.get(channel);
                    if (session.incrementNumUnfinishedResponses()) {
                        if (protocol.isMultiplex()) {
                            Http2PooledChannel pooledChannel = new Http2PooledChannel(channel, protocol);
                            this.addToPool(protocol, key, pooledChannel);
                            promise.complete(pooledChannel);
                        } else {
                            promise.complete(new Http1PooledChannel(channel, protocol, key));
                        }
                    } else {
                        channel.close();
                        promise.completeExceptionally(UnprocessedRequestException.of(RefusedStreamException.get()));
                    }
                    channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
                        block4: {
                            this.allChannels.remove(channel);
                            Deque<PooledChannel> queue = this.getPool(protocol, key);
                            if (queue != null) {
                                PooledChannel pooledChannel;
                                while ((pooledChannel = queue.peekFirst()) != null && !HttpChannelPool.isHealthy(pooledChannel)) {
                                    queue.removeFirst();
                                }
                            }
                            try {
                                this.listener.connectionClosed(protocol, remoteAddr, localAddr, channel);
                            }
                            catch (Exception e) {
                                if (!logger.isWarnEnabled()) break block4;
                                logger.warn("{} Exception handling {}.connectionClosed()", new Object[]{channel, this.listener.getClass().getName(), e});
                            }
                        }
                    }));
                    break block14;
                }
                Throwable throwable = future.cause();
                if (throwable instanceof ProxyConnectException) {
                    this.maybeHandleProxyFailure(desiredProtocol, key, throwable);
                }
                promise.completeExceptionally(UnprocessedRequestException.of(throwable));
            }
            catch (Exception e) {
                promise.completeExceptionally(UnprocessedRequestException.of(e));
            }
        }
    }

    private void addToPool(SessionProtocol actualProtocol, PoolKey key, PooledChannel pooledChannel) {
        assert (this.eventLoop.inEventLoop()) : Thread.currentThread().getName();
        this.getOrCreatePool(actualProtocol, key).addLast(pooledChannel);
    }

    @Override
    public CompletableFuture<?> closeAsync() {
        return this.closeable.closeAsync();
    }

    private void closeAsync(final CompletableFuture<?> future) {
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoop.execute(() -> this.closeAsync(future));
            return;
        }
        Channel[] allChannels = this.allChannels.keySet().toArray(EMPTY_CHANNELS);
        final int numAllChannels = allChannels.length;
        if (numAllChannels == 0) {
            future.complete(null);
            return;
        }
        ChannelFutureListener listener = new ChannelFutureListener(){
            private int numRemainingChannels;
            {
                this.numRemainingChannels = numAllChannels;
            }

            @Override
            public void operationComplete(ChannelFuture unused) throws Exception {
                if (--this.numRemainingChannels <= 0) {
                    future.complete(null);
                }
            }
        };
        for (Channel ch : allChannels) {
            ch.close().addListener(listener);
        }
    }

    @Override
    public void close() {
        if (Thread.currentThread() instanceof NonBlocking) {
            this.closeable.closeAsync();
        } else {
            this.closeable.close();
        }
    }

    static final class PoolKey {
        final Endpoint endpoint;
        final ProxyConfig proxyConfig;
        private final int hashCode;

        PoolKey(Endpoint endpoint, ProxyConfig proxyConfig) {
            this.endpoint = endpoint;
            this.proxyConfig = proxyConfig;
            this.hashCode = endpoint.hashCode() * 31 + proxyConfig.hashCode();
        }

        SocketAddress toRemoteAddress() {
            InetSocketAddress remoteAddr = this.endpoint.toSocketAddress(-1);
            if (this.endpoint.isDomainSocket()) {
                return ((DomainSocketAddress)remoteAddr).asNettyAddress();
            }
            assert (!remoteAddr.isUnresolved() || this.proxyConfig.proxyType().isForwardProxy()) : remoteAddr + ", " + this.proxyConfig;
            return remoteAddr;
        }

        public boolean equals(@Nullable Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof PoolKey)) {
                return false;
            }
            PoolKey that = (PoolKey)o;
            return this.hashCode == that.hashCode && this.endpoint.equals(that.endpoint) && this.proxyConfig.equals(that.proxyConfig);
        }

        public int hashCode() {
            return this.hashCode;
        }

        public String toString() {
            String host = this.endpoint.host();
            String ipAddr = this.endpoint.ipAddr();
            int port = this.endpoint.port();
            boolean isDomainSocket = this.endpoint.isDomainSocket();
            String proxyConfigStr = this.proxyConfig.proxyType() != ProxyType.DIRECT ? this.proxyConfig.toString() : null;
            try (TemporaryThreadLocals tempThreadLocals = TemporaryThreadLocals.acquire();){
                StringBuilder buf = tempThreadLocals.stringBuilder();
                buf.append('{').append(host);
                if (!isDomainSocket) {
                    if (ipAddr != null) {
                        buf.append('/').append(ipAddr);
                    }
                    buf.append(':').append(port);
                }
                if (proxyConfigStr != null) {
                    buf.append(" via ");
                    buf.append(proxyConfigStr);
                }
                buf.append('}');
                String string = buf.toString();
                return string;
            }
        }
    }

    private final class ChannelAcquisitionFuture
    extends CompletableFuture<PooledChannel> {
        @Nullable
        private Object pendingPiggybackHandlers;

        private ChannelAcquisitionFuture() {
        }

        void piggyback(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey key, ChannelAcquisitionFuture childPromise, ClientConnectionTimingsBuilder timingsBuilder) {
            if (!this.isDone()) {
                Consumer<PooledChannel> handler = pch -> this.handlePiggyback(desiredProtocol, serializationFormat, key, childPromise, timingsBuilder, (PooledChannel)pch);
                if (this.pendingPiggybackHandlers == null) {
                    this.pendingPiggybackHandlers = handler;
                    return;
                }
                if (!(this.pendingPiggybackHandlers instanceof List)) {
                    Consumer firstHandler = (Consumer)this.pendingPiggybackHandlers;
                    ArrayList<Consumer<PooledChannel>> list = new ArrayList<Consumer<PooledChannel>>();
                    list.add(firstHandler);
                    list.add(handler);
                    this.pendingPiggybackHandlers = list;
                    return;
                }
                List list = (List)this.pendingPiggybackHandlers;
                list.add(handler);
                return;
            }
            this.handlePiggyback(desiredProtocol, serializationFormat, key, childPromise, timingsBuilder, this.isCompletedExceptionally() ? null : (PooledChannel)this.getNow(null));
        }

        private void handlePiggyback(SessionProtocol desiredProtocol, SerializationFormat serializationFormat, PoolKey key, ChannelAcquisitionFuture childPromise, ClientConnectionTimingsBuilder timingsBuilder, @Nullable PooledChannel pch) {
            PiggybackedChannelAcquisitionResult result;
            if (pch != null) {
                SessionProtocol actualProtocol = pch.protocol();
                if (actualProtocol.isMultiplex()) {
                    HttpSession session = HttpSession.get(pch.get());
                    result = session.incrementNumUnfinishedResponses() ? PiggybackedChannelAcquisitionResult.SUCCESS : (HttpChannelPool.this.usePendingAcquisition(actualProtocol, serializationFormat, key, childPromise, timingsBuilder) ? PiggybackedChannelAcquisitionResult.PIGGYBACKED_AGAIN : PiggybackedChannelAcquisitionResult.NEW_CONNECTION);
                } else {
                    PooledChannel ch = HttpChannelPool.this.acquireNow(actualProtocol, serializationFormat, key);
                    if (ch != null) {
                        pch = ch;
                        result = PiggybackedChannelAcquisitionResult.SUCCESS;
                    } else {
                        result = PiggybackedChannelAcquisitionResult.NEW_CONNECTION;
                    }
                }
            } else {
                result = PiggybackedChannelAcquisitionResult.NEW_CONNECTION;
            }
            switch (result) {
                case SUCCESS: {
                    timingsBuilder.pendingAcquisitionEnd();
                    childPromise.complete(pch);
                    break;
                }
                case NEW_CONNECTION: {
                    timingsBuilder.pendingAcquisitionEnd();
                    HttpChannelPool.this.connect(desiredProtocol, serializationFormat, key, childPromise, timingsBuilder);
                    break;
                }
            }
        }

        @Override
        public boolean complete(PooledChannel value) {
            assert (value != null);
            if (!super.complete(value)) {
                return false;
            }
            this.handlePendingPiggybacks(value);
            return true;
        }

        @Override
        public boolean completeExceptionally(Throwable ex) {
            if (!super.completeExceptionally(ex)) {
                return false;
            }
            this.handlePendingPiggybacks(null);
            return true;
        }

        private void handlePendingPiggybacks(@Nullable PooledChannel value) {
            Object pendingPiggybackHandlers = this.pendingPiggybackHandlers;
            if (pendingPiggybackHandlers == null) {
                return;
            }
            this.pendingPiggybackHandlers = null;
            if (!(pendingPiggybackHandlers instanceof List)) {
                Consumer handler = (Consumer)pendingPiggybackHandlers;
                handler.accept(value);
                return;
            }
            List list = (List)pendingPiggybackHandlers;
            for (Consumer handler : list) {
                handler.accept(value);
            }
        }
    }

    static final class Http2PooledChannel
    extends PooledChannel {
        Http2PooledChannel(Channel channel, SessionProtocol protocol) {
            super(channel, protocol);
        }

        @Override
        public void release() {
        }
    }

    final class Http1PooledChannel
    extends PooledChannel {
        private final PoolKey key;

        Http1PooledChannel(Channel channel, SessionProtocol protocol, PoolKey key) {
            super(channel, protocol);
            this.key = key;
        }

        @Override
        public void release() {
            if (!HttpChannelPool.this.eventLoop.inEventLoop()) {
                HttpChannelPool.this.eventLoop.execute(this::doRelease);
            } else {
                this.doRelease();
            }
        }

        private void doRelease() {
            if (HttpChannelPool.isHealthy(this)) {
                HttpChannelPool.this.addToPool(this.protocol(), this.key, this);
            }
        }
    }

    private static enum PiggybackedChannelAcquisitionResult {
        SUCCESS,
        NEW_CONNECTION,
        PIGGYBACKED_AGAIN;

    }
}

