/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router.async;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdContext;
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.Async;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RouterAsyncRpcClient
extends RouterRpcClient {
    private static final Logger LOG = LoggerFactory.getLogger(RouterAsyncRpcClient.class);
    private final Router router;
    private final ActiveNamenodeResolver namenodeResolver;
    private final RouterRpcMonitor rpcMonitor;

    public RouterAsyncRpcClient(Configuration conf, Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor, RouterStateIdContext routerStateIdContext) {
        super(conf, router, resolver, monitor, routerStateIdContext);
        this.router = router;
        this.namenodeResolver = resolver;
        this.rpcMonitor = monitor;
    }

    @Override
    protected void initConcurrentCallExecutorService(Configuration conf) {
    }

    @Override
    public <T extends RemoteLocationContext> boolean invokeAll(Collection<T> locations, RemoteMethod method) throws IOException {
        this.invokeConcurrent(locations, method, false, false, Boolean.class);
        AsyncUtil.asyncApply(results -> results.containsValue(true));
        return AsyncUtil.asyncReturn(Boolean.TYPE);
    }

    @Override
    public Object invokeMethod(UserGroupInformation ugi, List<? extends FederationNamenodeContext> namenodes, boolean useObserver, Class<?> protocol, Method method, Object ... params) throws IOException {
        if (namenodes == null || namenodes.isEmpty()) {
            throw new IOException("No namenodes to invoke " + method.getName() + " with params " + Arrays.deepToString(params) + " from " + this.router.getRouterId());
        }
        String nsid = namenodes.get(0).getNameserviceId();
        ThreadLocalContext threadLocalContext = new ThreadLocalContext();
        AsyncUtil.asyncComplete(null);
        AsyncUtil.asyncApplyUseExecutor(o -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Async invoke method : {}, {}, {}, {}", new Object[]{method.getName(), useObserver, namenodes.toString(), params});
            }
            threadLocalContext.transfer();
            RouterRpcFairnessPolicyController controller = this.getRouterRpcFairnessPolicyController();
            this.acquirePermit(nsid, ugi, method.getName(), controller);
            this.invokeMethodAsync(ugi, namenodes, useObserver, protocol, method, params);
            AsyncUtil.asyncFinally(object -> {
                this.releasePermit(nsid, ugi, method, controller);
                return object;
            });
        }, this.router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, this.router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
        return null;
    }

    private void invokeMethodAsync(UserGroupInformation ugi, List<FederationNamenodeContext> namenodes, boolean useObserver, Class<?> protocol, Method method, Object ... params) {
        this.addClientInfoToCallerContext(ugi);
        if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOp();
        }
        RouterRpcClient.ExecutionStatus status = new RouterRpcClient.ExecutionStatus(false, useObserver);
        LinkedHashMap ioes = new LinkedHashMap();
        ConnectionContext[] connection = new ConnectionContext[1];
        AsyncUtil.asyncForEach(namenodes.iterator(), (foreach, namenode) -> {
            if (!status.isShouldUseObserver() && namenode.getState() == FederationNamenodeServiceState.OBSERVER) {
                AsyncUtil.asyncComplete(null);
                return;
            }
            String nsId = namenode.getNameserviceId();
            String rpcAddress = namenode.getRpcAddress();
            AsyncUtil.asyncTry(() -> {
                connection[0] = this.getConnection(ugi, nsId, rpcAddress, protocol);
                NameNodeProxiesClient.ProxyAndInfo<?> client = connection[0].getClient();
                this.invoke((FederationNamenodeContext)namenode, status.isShouldUseObserver(), 0, method, client.getProxy(), params);
                AsyncUtil.asyncApply(res -> {
                    status.setComplete(true);
                    this.postProcessResult(method, status, (FederationNamenodeContext)namenode, nsId, client);
                    foreach.breakNow();
                    return res;
                });
            });
            AsyncUtil.asyncCatch((res, ioe) -> {
                ioes.put(namenode, ioe);
                this.handleInvokeMethodIOException((FederationNamenodeContext)namenode, (IOException)ioe, status, useObserver);
                return res;
            }, IOException.class);
            AsyncUtil.asyncFinally(res -> {
                if (connection[0] != null) {
                    connection[0].release();
                }
                return res;
            });
        });
        AsyncUtil.asyncApply(res -> {
            if (status.isComplete()) {
                return res;
            }
            return this.handlerAllNamenodeFail(namenodes, method, ioes, params);
        });
    }

    @Override
    protected Object invoke(FederationNamenodeContext namenode, Boolean listObserverFirst, int retryCount, Method method, Object obj, Object ... params) throws IOException {
        try {
            Client.setAsynchronousMode((boolean)true);
            method.invoke(obj, params);
            Client.setAsynchronousMode((boolean)false);
            AsyncUtil.asyncCatch((o, e) -> this.handlerInvokeException(namenode, listObserverFirst, retryCount, method, obj, e, params), Throwable.class);
        }
        catch (InvocationTargetException e2) {
            AsyncUtil.asyncThrowException(e2.getCause());
        }
        catch (IllegalAccessException | IllegalArgumentException e3) {
            LOG.error("Unexpected exception while proxying API", (Throwable)e3);
            AsyncUtil.asyncThrowException(e3);
        }
        return null;
    }

    @Override
    public <T> T invokeSequential(List<? extends RemoteLocationContext> locations, RemoteMethod remoteMethod, Class<T> expectedResultClass, Object expectedResultValue) throws IOException {
        this.invokeSequential(remoteMethod, locations, expectedResultClass, expectedResultValue);
        AsyncUtil.asyncApply(RemoteResult::getResult);
        return AsyncUtil.asyncReturn(expectedResultClass);
    }

    @Override
    public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(RemoteMethod remoteMethod, List<R> locations, Class<T> expectedResultClass, Object expectedResultValue) throws IOException {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        Method m = remoteMethod.getMethod();
        ArrayList thrownExceptions = new ArrayList();
        Object[] firstResult = new Object[]{null};
        RouterRpcClient.ExecutionStatus status = new RouterRpcClient.ExecutionStatus();
        Iterator<R> locationIterator = locations.iterator();
        AsyncUtil.asyncForEach(locationIterator, (foreach, loc) -> {
            String ns = loc.getNameserviceId();
            boolean isObserverRead = this.isObserverReadEligible(ns, m);
            List<? extends FederationNamenodeContext> namenodes = this.getOrderedNamenodes(ns, isObserverRead);
            AsyncUtil.asyncTry(() -> {
                Class<?> proto = remoteMethod.getProtocol();
                Object[] params = remoteMethod.getParams((RemoteLocationContext)loc);
                this.invokeMethod(ugi, namenodes, isObserverRead, proto, m, params);
                AsyncUtil.asyncApply(result -> {
                    if (RouterAsyncRpcClient.isExpectedClass(expectedResultClass, result) && RouterAsyncRpcClient.isExpectedValue(expectedResultValue, result)) {
                        RemoteLocationContext location = loc;
                        Object ret = result;
                        foreach.breakNow();
                        status.setComplete(true);
                        return new RemoteResult<RemoteLocationContext, Object>(location, ret);
                    }
                    if (firstResult[0] == null) {
                        firstResult[0] = result;
                    }
                    return null;
                });
            });
            AsyncUtil.asyncCatch((ret, e) -> {
                if (e instanceof IOException) {
                    IOException ioe = (IOException)e;
                    ioe = this.processException(ioe, (RemoteLocationContext)loc);
                    thrownExceptions.add(ioe);
                } else {
                    LOG.error("Unexpected exception {} proxying {} to {}", new Object[]{e.getClass(), m.getName(), ns, e});
                    IOException ioe = new IOException("Unexpected exception proxying API " + e.getMessage(), e);
                    thrownExceptions.add(ioe);
                }
                return ret;
            }, Exception.class);
        });
        AsyncUtil.asyncApply(result -> {
            if (status.isComplete()) {
                return result;
            }
            if (!thrownExceptions.isEmpty()) {
                for (int i = 0; i < thrownExceptions.size(); ++i) {
                    IOException ioe = (IOException)thrownExceptions.get(i);
                    if (!RouterAsyncRpcClient.isUnavailableException(ioe)) continue;
                    throw ioe;
                }
                throw (IOException)thrownExceptions.get(0);
            }
            Object ret = firstResult[0];
            return new RemoteResult<RemoteLocationContext, Object>((RemoteLocationContext)locations.get(0), ret);
        });
        return AsyncUtil.asyncReturn(RemoteResult.class);
    }

    @Override
    public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(Collection<T> locations, RemoteMethod method, boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz) throws IOException {
        this.invokeConcurrent(locations, method, standby, timeOutMs, clazz);
        AsyncUtil.asyncApply(results -> RouterAsyncRpcClient.postProcessResult(requireResponse, results));
        return AsyncUtil.asyncReturn(Map.class);
    }

    @Override
    protected <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> getRemoteResults(RemoteMethod method, long timeOutMs, RouterRpcFairnessPolicyController controller, List<T> orderedLocations, List<Callable<Object>> callables) throws IOException {
        Method m = method.getMethod();
        CompletableFuture[] futures = new CompletableFuture[callables.size()];
        int i = 0;
        for (Callable<Object> callable : callables) {
            CompletableFuture<Object> future = null;
            try {
                callable.call();
                future = AsyncUtil.getCompletableFuture();
            }
            catch (Exception e) {
                future = new CompletableFuture();
                future.completeExceptionally(Async.warpCompletionException(e));
            }
            futures[i++] = future;
        }
        AsyncUtil.asyncCompleteWith(CompletableFuture.allOf(futures).handle((unused, throwable) -> {
            try {
                return this.processFutures(method, m, orderedLocations, Arrays.asList(futures));
            }
            catch (InterruptedException e) {
                LOG.error("Unexpected error while invoking API: {}", (Object)e.getMessage());
                throw Async.warpCompletionException(new IOException("Unexpected error while invoking API " + e.getMessage(), e));
            }
        }));
        return AsyncUtil.asyncReturn(List.class);
    }

    @Override
    public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingle(T location, RemoteMethod method) throws IOException {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        Method m = method.getMethod();
        String ns = location.getNameserviceId();
        boolean isObserverRead = this.isObserverReadEligible(ns, m);
        List<? extends FederationNamenodeContext> namenodes = this.getOrderedNamenodes(ns, isObserverRead);
        AsyncUtil.asyncTry(() -> {
            Class<?> proto = method.getProtocol();
            Object[] paramList = method.getParams(location);
            this.invokeMethod(ugi, namenodes, isObserverRead, proto, m, paramList);
            AsyncUtil.asyncApply(result -> {
                RemoteResult<RemoteLocationContext, Object> remoteResult = new RemoteResult<RemoteLocationContext, Object>(location, result);
                return Collections.singletonList(remoteResult);
            });
        });
        AsyncUtil.asyncCatch((o, ioe) -> {
            throw this.processException((IOException)ioe, location);
        }, IOException.class);
        return AsyncUtil.asyncReturn(List.class);
    }

    @Override
    public Object invokeSingle(String nsId, RemoteMethod method) throws IOException {
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        boolean isObserverRead = this.isObserverReadEligible(nsId, method.getMethod());
        List<? extends FederationNamenodeContext> nns = this.getOrderedNamenodes(nsId, isObserverRead);
        RemoteLocation loc = new RemoteLocation(nsId, "/", "/");
        Class<?> proto = method.getProtocol();
        Method m = method.getMethod();
        Object[] params = method.getParams(loc);
        this.invokeMethod(ugi, nns, isObserverRead, proto, m, params);
        return null;
    }

    @Override
    public <T> T invokeSingle(RemoteLocationContext location, RemoteMethod remoteMethod, Class<T> clazz) throws IOException {
        List<RemoteLocationContext> locations = Collections.singletonList(location);
        this.invokeSequential(locations, remoteMethod);
        return AsyncUtil.asyncReturn(clazz);
    }

    protected void releasePermit(String nsId, UserGroupInformation ugi, Method m, RouterRpcFairnessPolicyController controller) {
        if (controller != null) {
            controller.releasePermit(nsId);
            LOG.trace("Permit released for ugi: {} for method: {}", (Object)ugi, (Object)m.getName());
        }
    }
}

