/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.services.AbfsApacheHttpClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpClientConnectionFactory;
import org.apache.hadoop.fs.azurebfs.services.AbfsManagedApacheHttpConnection;
import org.apache.hadoop.fs.azurebfs.services.AbfsManagedHttpClientContext;
import org.apache.hadoop.fs.azurebfs.services.KeepAliveCache;
import org.apache.http.HttpClientConnection;
import org.apache.http.HttpHost;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.HttpClientConnectionOperator;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AbfsConnectionManager
implements HttpClientConnectionManager {
    private static final Logger LOG = LoggerFactory.getLogger(AbfsConnectionManager.class);
    private final KeepAliveCache kac;
    private final AbfsHttpClientConnectionFactory httpConnectionFactory;
    private final HttpClientConnectionOperator connectionOperator;
    private final AbfsConfiguration abfsConfiguration;
    private final AtomicBoolean isCacheRefreshInProgress = new AtomicBoolean(false);
    private final Object connectionLock = new Object();
    private final HttpHost baseHost;

    AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry, AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac, AbfsConfiguration abfsConfiguration, URL baseUrl, boolean isCacheWarmupNeeded) {
        this.httpConnectionFactory = connectionFactory;
        this.kac = kac;
        this.connectionOperator = new DefaultHttpClientConnectionOperator(socketFactoryRegistry, null, null);
        this.abfsConfiguration = abfsConfiguration;
        this.baseHost = new HttpHost(baseUrl.getHost(), baseUrl.getDefaultPort(), baseUrl.getProtocol());
        if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0 && kac.getFixedThreadPool() != null) {
            LOG.debug("Warming up the KeepAliveCache with {} connections", (Object)abfsConfiguration.getApacheCacheWarmupCount());
            HttpRoute route = new HttpRoute(this.baseHost, null, true);
            int totalConnectionsCreated = this.cacheExtraConnection(route, abfsConfiguration.getApacheCacheWarmupCount());
            if (totalConnectionsCreated == 0) {
                AbfsApacheHttpClient.registerFallback();
            } else {
                AbfsApacheHttpClient.setUsable();
            }
        }
    }

    public ConnectionRequest requestConnection(final HttpRoute route, Object state) {
        return new ConnectionRequest(){

            /*
             * Exception decompiling
             */
            public HttpClientConnection get(long timeout, TimeUnit timeUnit) throws ExecutionException {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [3[TRYBLOCK], 2[TRYBLOCK]], but top level block is 14[WHILELOOP]
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }

            public boolean cancel() {
                return false;
            }

            private void triggerConnectionRefreshIfNeeded() {
                if (!AbfsConnectionManager.this.isCacheRefreshInProgress.get() && !AbfsConnectionManager.this.kac.getIsClosed() && AbfsConnectionManager.this.kac.getFixedThreadPool() != null && AbfsConnectionManager.this.kac.getSingleThreadPool() != null && AbfsConnectionManager.this.kac.size() <= AbfsConnectionManager.this.abfsConfiguration.getApacheMinTriggerRefreshCount()) {
                    try {
                        AbfsConnectionManager.this.kac.getSingleThreadPool().submit(() -> AbfsConnectionManager.this.cacheExtraConnection(route, AbfsConnectionManager.this.abfsConfiguration.getApacheCacheRefreshCount()));
                    }
                    catch (RejectedExecutionException e) {
                        LOG.debug("Task rejected for connection refresh: {}", (Object)e.getMessage());
                    }
                }
            }

            private HttpClientConnection createNewConnection() {
                return AbfsConnectionManager.this.httpConnectionFactory.create(route, (ConnectionConfig)null);
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseConnection(HttpClientConnection conn, Object newState, long validDuration, TimeUnit timeUnit) {
        long start = System.nanoTime();
        try {
            if (validDuration == 0L) {
                return;
            }
            this.addConnectionToCache(conn);
        }
        finally {
            LOG.debug("Connection released: {} in {} ms", (Object)conn, (Object)AbfsConnectionManager.elapsedTimeMillis(start));
        }
    }

    public void connect(HttpClientConnection conn, HttpRoute route, int connectTimeout, HttpContext context) throws IOException {
        long start = System.nanoTime();
        LOG.debug("Connecting {} to {}", (Object)conn, (Object)route.getTargetHost());
        this.connectionOperator.connect((ManagedHttpClientConnection)((AbfsManagedApacheHttpConnection)conn), route.getTargetHost(), route.getLocalSocketAddress(), connectTimeout, SocketConfig.DEFAULT, context);
        LOG.debug("Connection established: {}", (Object)conn);
        if (context instanceof AbfsManagedHttpClientContext) {
            ((AbfsManagedHttpClientContext)context).setConnectTime(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
        }
    }

    public void upgrade(HttpClientConnection conn, HttpRoute route, HttpContext context) throws IOException {
        this.connectionOperator.upgrade((ManagedHttpClientConnection)((AbfsManagedApacheHttpConnection)conn), route.getTargetHost(), context);
    }

    public void routeComplete(HttpClientConnection conn, HttpRoute route, HttpContext context) throws IOException {
    }

    public void closeIdleConnections(long idletime, TimeUnit timeUnit) {
    }

    public void closeExpiredConnections() {
    }

    public void shutdown() {
        this.kac.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int cacheExtraConnection(HttpRoute route, int numberOfConnections) {
        AtomicInteger totalConnectionCreated = new AtomicInteger(0);
        if (!this.isCacheRefreshInProgress.getAndSet(true)) {
            long start = System.nanoTime();
            CountDownLatch latch = new CountDownLatch(numberOfConnections);
            for (int i = 0; i < numberOfConnections; ++i) {
                try {
                    this.kac.getFixedThreadPool().submit(() -> {
                        ManagedHttpClientConnection conn = null;
                        try {
                            conn = this.httpConnectionFactory.create(route, (ConnectionConfig)null);
                            this.connect((HttpClientConnection)conn, route, this.abfsConfiguration.getHttpConnectionTimeout(), (HttpContext)new AbfsManagedHttpClientContext());
                            this.addConnectionToCache((HttpClientConnection)conn);
                            totalConnectionCreated.incrementAndGet();
                        }
                        catch (Exception e) {
                            LOG.debug("Error creating connection: {}", (Object)e.getMessage());
                            if (conn != null) {
                                try {
                                    conn.close();
                                }
                                catch (IOException ioException) {
                                    LOG.debug("Error closing connection: {}", (Object)ioException.getMessage());
                                }
                            }
                        }
                        finally {
                            latch.countDown();
                        }
                    });
                    continue;
                }
                catch (RejectedExecutionException e) {
                    LOG.debug("Task rejected for connection creation: {}", (Object)e.getMessage());
                    return -1;
                }
            }
            try {
                boolean result = latch.await(this.abfsConfiguration.getApacheWarmupCacheTimeoutInMillis(), TimeUnit.MILLISECONDS);
                if (!result) {
                    LOG.debug("Timeout waiting for connections to be created");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.isCacheRefreshInProgress.set(false);
                LOG.debug("Connection refresh completed in {} ms", (Object)AbfsConnectionManager.elapsedTimeMillis(start));
            }
        }
        return totalConnectionCreated.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addConnectionToCache(HttpClientConnection conn) {
        if (conn instanceof AbfsManagedApacheHttpConnection && ((AbfsManagedApacheHttpConnection)conn).getTargetHost().equals((Object)this.baseHost)) {
            boolean connAddedInKac = this.kac.add(conn);
            if (connAddedInKac) {
                Object object = this.connectionLock;
                synchronized (object) {
                    this.connectionLock.notify();
                }
                LOG.debug("Connection cached: {}", (Object)conn);
            } else {
                LOG.debug("Connection not cached, and is released: {}", (Object)conn);
            }
        }
    }

    private static long elapsedTimeMillis(long startTime) {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
    }
}

