/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router.async;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestRouterAsyncRpcClient {
    private static Configuration routerConf;
    private static MiniRouterDFSCluster cluster;
    private static String ns0;
    private static String ns1;
    private MiniRouterDFSCluster.RouterContext router;
    private FileSystem routerFs;
    private RouterRpcServer routerRpcServer;
    private RouterAsyncRpcClient asyncRpcClient;
    private FederationRPCMetrics rpcMetrics;
    private final String testFile = "/test.file";

    @BeforeAll
    public static void setUpCluster() throws Exception {
        cluster = new MiniRouterDFSCluster(true, 2, 3, MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS, 1000L);
        cluster.setNumDatanodesPerNameservice(3);
        cluster.startCluster();
        if (cluster.isHighAvailability()) {
            for (String ns : cluster.getNameservices()) {
                cluster.switchToActive(ns, FederationTestUtils.NAMENODES[0]);
                cluster.switchToStandby(ns, FederationTestUtils.NAMENODES[1]);
                cluster.switchToObserver(ns, FederationTestUtils.NAMENODES[2]);
            }
        }
        routerConf = new RouterConfigBuilder().metrics().rpc().build();
        routerConf.setInt("dfs.federation.router.client.thread-size", 1);
        routerConf.setInt("dfs.federation.router.async.rpc.handler.count", 1);
        routerConf.setInt("dfs.federation.router.async.rpc.responder.count", 1);
        routerConf.setTimeDuration("dfs.federation.router.dn-report.cache-expire", 1L, TimeUnit.SECONDS);
        cluster.addRouterOverrides(routerConf);
        cluster.startRouters();
        cluster.registerNamenodes();
        cluster.waitNamenodeRegistration();
        cluster.waitActiveNamespaces();
        ns0 = cluster.getNameservices().get(0);
        ns1 = cluster.getNameservices().get(1);
    }

    @AfterAll
    public static void shutdownCluster() {
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @BeforeEach
    public void setup() throws Exception {
        this.installMockLocations();
        this.router = cluster.getRandomRouter();
        this.rpcMetrics = this.router.getRouter().getRpcServer().getRPCMetrics();
        this.routerFs = this.router.getFileSystem();
        this.routerRpcServer = this.router.getRouterRpcServer();
        this.routerRpcServer.initAsyncThreadPools(routerConf);
        this.asyncRpcClient = new RouterAsyncRpcClient(routerConf, this.router.getRouter(), this.routerRpcServer.getNamenodeResolver(), this.routerRpcServer.getRPCMonitor(), this.routerRpcServer.getRouterStateIdContext());
        FSDataOutputStream fsDataOutputStream = this.routerFs.create(new Path("/test.file"), true);
        fsDataOutputStream.write(new byte[1024]);
        fsDataOutputStream.close();
    }

    @AfterEach
    public void down() throws IOException {
        CallerContext.setCurrent(null);
        cluster.switchToActive(ns0, FederationTestUtils.NAMENODES[0]);
        this.asyncRpcClient.getNamenodeResolver().updateActiveNamenode(ns0, NetUtils.createSocketAddr((String)cluster.getNamenode(ns0, FederationTestUtils.NAMENODES[0]).getRpcAddress()));
        boolean delete = this.routerFs.delete(new Path("/test.file"));
        Assertions.assertTrue((boolean)delete);
        if (this.routerFs != null) {
            this.routerFs.close();
        }
    }

    @Test
    public void testInvokeSingle() throws Exception {
        long proxyOps = this.rpcMetrics.getProxyOps();
        long activeProxyOps = this.rpcMetrics.getActiveProxyOps();
        RemoteMethod method = new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
        this.asyncRpcClient.invokeSingle(ns0, method);
        long id = (Long)AsyncUtil.syncReturn(Long.class);
        Assertions.assertTrue((id > 0L ? 1 : 0) != 0);
        Assertions.assertEquals((long)(proxyOps + 1L), (long)this.rpcMetrics.getProxyOps());
        Assertions.assertEquals((long)(activeProxyOps + 1L), (long)this.rpcMetrics.getActiveProxyOps());
        Assertions.assertTrue((this.rpcMetrics.getProcessingAvg() > 0.0 ? 1 : 0) != 0);
        Assertions.assertTrue((this.rpcMetrics.getProxyAvg() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testInvokeAll() throws Exception {
        long proxyOps = this.rpcMetrics.getProxyOps();
        long activeProxyOps = this.rpcMetrics.getActiveProxyOps();
        List locations = this.routerRpcServer.getLocationsForPath("/multDes/dir", false);
        RemoteMethod method = new RemoteMethod("mkdirs", new Class[]{String.class, FsPermission.class, Boolean.TYPE}, new Object[]{new RemoteParam(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false});
        this.asyncRpcClient.invokeAll((Collection)locations, method);
        LambdaTestUtils.intercept(FileNotFoundException.class, (String)"Parent directory doesn't exist: /multDes", () -> (Boolean)AsyncUtil.syncReturn(Boolean.TYPE));
        Assertions.assertEquals((long)(proxyOps + 2L), (long)this.rpcMetrics.getProxyOps());
        Assertions.assertEquals((long)(activeProxyOps + 2L), (long)this.rpcMetrics.getActiveProxyOps());
        proxyOps = this.rpcMetrics.getProxyOps();
        activeProxyOps = this.rpcMetrics.getActiveProxyOps();
        method = new RemoteMethod("mkdirs", new Class[]{String.class, FsPermission.class, Boolean.TYPE}, new Object[]{new RemoteParam(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), true});
        this.asyncRpcClient.invokeAll((Collection)locations, method);
        Boolean success = (Boolean)AsyncUtil.syncReturn(Boolean.class);
        Assertions.assertTrue((boolean)success);
        Assertions.assertEquals((long)(proxyOps + 2L), (long)this.rpcMetrics.getProxyOps());
        Assertions.assertEquals((long)(activeProxyOps + 2L), (long)this.rpcMetrics.getActiveProxyOps());
        FileStatus[] fileStatuses = this.routerFs.listStatus(new Path("/multDes"));
        Assertions.assertNotNull((Object)fileStatuses);
        Assertions.assertTrue((this.rpcMetrics.getProcessingAvg() > 0.0 ? 1 : 0) != 0);
        Assertions.assertTrue((this.rpcMetrics.getProxyAvg() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testInvokeMethod() throws Exception {
        long proxyOps = this.rpcMetrics.getProxyOps();
        long activeProxyOps = this.rpcMetrics.getActiveProxyOps();
        RemoteMethod method = new RemoteMethod("getFileInfo", new Class[]{String.class}, new Object[]{new RemoteParam()});
        UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
        Class protocol = method.getProtocol();
        Object[] params = new String[]{"/test.file"};
        List namenodes = this.asyncRpcClient.getOrderedNamenodes(ns0, false);
        this.asyncRpcClient.invokeMethod(ugi, namenodes, false, protocol, method.getMethod(), params);
        FileStatus fileStatus = (FileStatus)AsyncUtil.syncReturn(FileStatus.class);
        Assertions.assertEquals((long)1024L, (long)fileStatus.getLen());
        Assertions.assertEquals((long)(proxyOps + 1L), (long)this.rpcMetrics.getProxyOps());
        Assertions.assertEquals((long)(activeProxyOps + 1L), (long)this.rpcMetrics.getActiveProxyOps());
        LambdaTestUtils.intercept(IOException.class, (String)"No namenodes to invoke", () -> this.asyncRpcClient.invokeMethod(ugi, new ArrayList(), false, protocol, method.getMethod(), params));
        proxyOps = this.rpcMetrics.getProxyOps();
        activeProxyOps = this.rpcMetrics.getActiveProxyOps();
        this.asyncRpcClient.invokeMethod(ugi, namenodes.subList(1, 3), false, protocol, method.getMethod(), params);
        LambdaTestUtils.intercept(StandbyException.class, (String)"No namenode available to invoke getFileInfo", () -> (FileStatus)AsyncUtil.syncReturn(FileStatus.class));
        Assertions.assertEquals((long)proxyOps, (long)this.rpcMetrics.getProxyOps());
        Assertions.assertEquals((long)activeProxyOps, (long)this.rpcMetrics.getActiveProxyOps());
        cluster.switchToStandby(ns0, FederationTestUtils.NAMENODES[0]);
        this.asyncRpcClient.getNamenodeResolver().updateUnavailableNamenode(ns0, NetUtils.createSocketAddr((String)((FederationNamenodeContext)namenodes.get(0)).getRpcAddress()));
        this.asyncRpcClient.invokeMethod(ugi, namenodes, false, protocol, method.getMethod(), params);
        LambdaTestUtils.intercept(RetriableException.class, (String)"No namenodes available under nameservice ns0", () -> (FileStatus)AsyncUtil.syncReturn(FileStatus.class));
        Assertions.assertEquals((long)1L, (long)this.rpcMetrics.getProxyOpNoNamenodes());
        this.asyncRpcClient.invokeMethod(ugi, namenodes, false, null, method.getMethod(), params);
        LambdaTestUtils.intercept(StandbyException.class, (String)"Cannot get a connection", () -> (FileStatus)AsyncUtil.syncReturn(FileStatus.class));
        Assertions.assertEquals((long)1L, (long)this.rpcMetrics.getProxyOpFailureCommunicate());
    }

    @Test
    public void testInvokeSequential() throws Exception {
        List locations = this.routerRpcServer.getLocationsForPath("/test.file", false, false);
        RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations", new Class[]{String.class, Long.TYPE, Long.TYPE}, new Object[]{new RemoteParam(), 0, 1024});
        this.asyncRpcClient.invokeSequential(locations, remoteMethod, LocatedBlocks.class, null);
        LocatedBlocks locatedBlocks = (LocatedBlocks)AsyncUtil.syncReturn(LocatedBlocks.class);
        Assertions.assertEquals((long)1024L, (long)locatedBlocks.getFileLength());
        Assertions.assertEquals((int)1, (int)locatedBlocks.getLocatedBlocks().size());
    }

    private void installMockLocations() {
        List<MiniRouterDFSCluster.RouterContext> routers = cluster.getRouters();
        for (MiniRouterDFSCluster.RouterContext rc : routers) {
            Router r = rc.getRouter();
            MockResolver resolver = (MockResolver)r.getSubclusterResolver();
            resolver.addLocation("/", ns0, "/");
            resolver.addLocation("/multDes", ns0, "/multDes");
            resolver.addLocation("/multDes", ns1, "/multDes");
        }
    }
}

