/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.router.NamenodeHeartbeatService;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRouterClientRejectOverload {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
    private StateStoreDFSCluster cluster;

    @AfterEach
    public void cleanup() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void setupCluster(boolean overloadControl, boolean ha) throws Exception {
        this.cluster = new StateStoreDFSCluster(ha, 2);
        Configuration routerConf = new RouterConfigBuilder().stateStore().metrics().admin().rpc().heartbeat().build();
        routerConf.setInt("dfs.federation.router.client.thread-size", 4);
        routerConf.setBoolean("dfs.federation.router.client.reject.overload", overloadControl);
        this.cluster.setNumDatanodesPerNameservice(0);
        this.cluster.addRouterOverrides(routerConf);
        this.cluster.startCluster();
        this.cluster.startRouters();
        this.cluster.waitClusterUp();
    }

    @Test
    public void testWithoutOverloadControl() throws Exception {
        this.setupCluster(false, false);
        this.testOverloaded(0);
        MiniDFSCluster dfsCluster = this.cluster.getCluster();
        NameNode nn0 = dfsCluster.getNameNode(0);
        FederationTestUtils.simulateSlowNamenode(nn0, 1);
        this.testOverloaded(0);
        for (MiniRouterDFSCluster.RouterContext router : this.cluster.getRouters()) {
            FederationRPCMetrics rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)rpcMetrics.getProxyOpFailureClientOverloaded());
        }
    }

    @Test
    public void testOverloadControl() throws Exception {
        this.setupCluster(true, false);
        List<MiniRouterDFSCluster.RouterContext> routers = this.cluster.getRouters();
        FederationRPCMetrics rpcMetrics0 = routers.get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rpcMetrics1 = routers.get(1).getRouter().getRpcServer().getRPCMetrics();
        this.testOverloaded(0);
        org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)rpcMetrics0.getProxyOpFailureClientOverloaded());
        org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)rpcMetrics1.getProxyOpFailureClientOverloaded());
        MiniDFSCluster dfsCluster = this.cluster.getCluster();
        NameNode nn0 = dfsCluster.getNameNode(0);
        FederationTestUtils.simulateSlowNamenode(nn0, 1);
        this.testOverloaded(4, 6);
        org.junit.jupiter.api.Assertions.assertTrue((rpcMetrics0.getProxyOpFailureClientOverloaded() + rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4L ? 1 : 0) != 0);
        Configuration clientConf = this.cluster.getRouterClientConf();
        long iniProxyOps0 = rpcMetrics0.getProxyOps();
        long iniProxyOps1 = rpcMetrics1.getProxyOps();
        this.testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
        long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
        long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
        org.junit.jupiter.api.Assertions.assertEquals((long)20L, (long)(proxyOps0 + proxyOps1));
        org.junit.jupiter.api.Assertions.assertTrue((proxyOps0 >= 8L ? 1 : 0) != 0, (String)(proxyOps0 + " operations: not distributed"));
        org.junit.jupiter.api.Assertions.assertTrue((proxyOps1 >= 8L ? 1 : 0) != 0, (String)(proxyOps1 + " operations: not distributed"));
    }

    private void testOverloaded(int expOverload) throws Exception {
        this.testOverloaded(expOverload, expOverload);
    }

    private void testOverloaded(int expOverloadMin, int expOverloadMax) throws Exception {
        MiniRouterDFSCluster.RouterContext routerContext = this.cluster.getRandomRouter();
        URI address = routerContext.getFileSystemURI();
        HdfsConfiguration conf = new HdfsConfiguration();
        this.testOverloaded(expOverloadMin, expOverloadMax, address, (Configuration)conf, 10);
    }

    private void testOverloaded(int expOverloadMin, int expOverloadMax, URI address, Configuration conf, int numOps) throws Exception {
        AtomicInteger overloadException = new AtomicInteger();
        ExecutorService exec = Executors.newFixedThreadPool(numOps);
        ArrayList futures = new ArrayList();
        for (int i = 0; i < numOps; ++i) {
            int sleepTime = i * 50;
            Future<?> future = exec.submit(() -> {
                DFSClient routerClient = null;
                try {
                    Thread.sleep(sleepTime);
                    routerClient = new DFSClient(address, conf);
                    String clientName = routerClient.getClientName();
                    ClientProtocol routerProto = routerClient.getNamenode();
                    routerProto.renewLease(clientName, null);
                }
                catch (RemoteException re) {
                    IOException ioe = re.unwrapRemoteException();
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)(ioe instanceof StandbyException), (String)("Wrong exception: " + ioe));
                    GenericTestUtils.assertExceptionContains((String)"is overloaded", (Throwable)ioe);
                    overloadException.incrementAndGet();
                }
                catch (IOException e) {
                    org.junit.jupiter.api.Assertions.fail((String)("Unexpected exception: " + e));
                }
                catch (InterruptedException e) {
                    org.junit.jupiter.api.Assertions.fail((String)("Cannot sleep: " + e));
                }
                finally {
                    if (routerClient != null) {
                        try {
                            routerClient.close();
                        }
                        catch (IOException e) {
                            LOG.error("Cannot close the client");
                        }
                    }
                }
            });
            futures.add(future);
        }
        while (!futures.isEmpty()) {
            ((Future)futures.remove(0)).get();
        }
        exec.shutdown();
        int num = overloadException.get();
        if (expOverloadMin == expOverloadMax) {
            org.junit.jupiter.api.Assertions.assertEquals((int)expOverloadMin, (int)num);
        } else {
            org.junit.jupiter.api.Assertions.assertTrue((num >= expOverloadMin ? 1 : 0) != 0, (String)("Expected >=" + expOverloadMin + " but was " + num));
            org.junit.jupiter.api.Assertions.assertTrue((num <= expOverloadMax ? 1 : 0) != 0, (String)("Expected <=" + expOverloadMax + " but was " + num));
        }
    }

    @Test
    public void testConnectionNullException() throws Exception {
        this.setupCluster(false, false);
        MiniRouterDFSCluster.RouterContext routerContext = this.cluster.getRouters().get(0);
        Router router = routerContext.getRouter();
        FederationTestUtils.simulateThrowExceptionRouterRpcServer(router.getRpcServer());
        Configuration conf = this.cluster.getRouterClientConf();
        conf.setBoolean("dfs.client.failover.random.order", false);
        DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
        FederationRPCMetrics rpcMetrics0 = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rpcMetrics1 = this.cluster.getRouters().get(1).getRouter().getRpcServer().getRPCMetrics();
        long originalRouter0Failures = rpcMetrics0.getProxyOpFailureCommunicate();
        long originalRouter1Failures = rpcMetrics1.getProxyOpFailureCommunicate();
        routerClient.getFileInfo("/");
        org.junit.jupiter.api.Assertions.assertEquals((long)(originalRouter0Failures + 1L), (long)rpcMetrics0.getProxyOpFailureCommunicate());
        org.junit.jupiter.api.Assertions.assertEquals((long)originalRouter1Failures, (long)rpcMetrics1.getProxyOpFailureCommunicate());
    }

    @Test
    public void testNoNamenodesAvailable() throws Exception {
        this.setupCluster(false, true);
        FederationTestUtils.transitionClusterNSToStandby(this.cluster);
        Configuration conf = this.cluster.getRouterClientConf();
        conf.setBoolean("dfs.client.failover.random.order", false);
        conf.setInt("dfs.client.retry.max.attempts", 2);
        DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
        FederationRPCMetrics rpcMetrics0 = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rpcMetrics1 = this.cluster.getRouters().get(1).getRouter().getRpcServer().getRPCMetrics();
        String exceptionMessage = "org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException: No namenodes available under nameservice ns0";
        RemoteException remoteException = (RemoteException)org.junit.jupiter.api.Assertions.assertThrows(RemoteException.class, () -> {
            long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
            long originalRouter1Failures = rpcMetrics1.getProxyOpNoNamenodes();
            routerClient.getFileInfo("/");
            org.junit.jupiter.api.Assertions.assertEquals((long)(originalRouter0Failures + 4L), (long)rpcMetrics0.getProxyOpNoNamenodes());
            org.junit.jupiter.api.Assertions.assertEquals((long)originalRouter1Failures, (long)rpcMetrics1.getProxyOpNoNamenodes());
            FederationTestUtils.transitionClusterNSToActive(this.cluster, 0);
            for (MiniRouterDFSCluster.RouterContext routerContext : this.cluster.getRouters()) {
                Collection heartbeatServices = routerContext.getRouter().getNamenodeHeartbeatServices();
                for (NamenodeHeartbeatService service : heartbeatServices) {
                    service.periodicInvoke();
                }
                routerContext.getRouter().getStateStore().refreshCaches(true);
            }
            originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
            routerClient.getFileInfo("/");
            org.junit.jupiter.api.Assertions.assertEquals((long)originalRouter0Failures, (long)rpcMetrics0.getProxyOpNoNamenodes());
        });
        Assertions.assertThat((String)remoteException.getMessage()).contains(new CharSequence[]{exceptionMessage});
    }

    @Test
    public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception {
        this.setupCluster(false, true);
        FederationTestUtils.transitionClusterNSToStandby(this.cluster);
        for (MiniRouterDFSCluster.RouterContext routerContext : this.cluster.getRouters()) {
            Collection heartbeatServices = routerContext.getRouter().getNamenodeHeartbeatServices();
            for (NamenodeHeartbeatService service : heartbeatServices) {
                service.periodicInvoke();
            }
            routerContext.getRouter().getStateStore().refreshCaches(true);
        }
        long firstLoadTime = Time.now();
        List<MiniRouterDFSCluster.NamenodeContext> namenodes = this.cluster.getNamenodes();
        for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
            org.junit.jupiter.api.Assertions.assertEquals((int)HAServiceProtocol.HAServiceState.STANDBY.ordinal(), (int)namenodeContext.getNamenode().getNameNodeState());
        }
        Configuration conf = this.cluster.getRouterClientConf();
        conf.setBoolean("dfs.client.failover.random.order", false);
        DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
        for (MiniRouterDFSCluster.RouterContext routerContext : this.cluster.getRouters()) {
            List ns0 = routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId("ns0", false);
            String nsId = ((FederationNamenodeContext)ns0.get(1)).getNamenodeId();
            this.cluster.switchToActive("ns0", nsId);
            Collection heartbeatServices = routerContext.getRouter().getNamenodeHeartbeatServices();
            for (NamenodeHeartbeatService service : heartbeatServices) {
                service.periodicInvoke();
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)HAServiceProtocol.HAServiceState.ACTIVE.ordinal(), (int)this.cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState());
        }
        FederationRPCMetrics rpcMetrics0 = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
        routerClient.getFileInfo("/");
        long successReadTime = Time.now();
        org.junit.jupiter.api.Assertions.assertEquals((long)(originalRouter0Failures + 1L), (long)rpcMetrics0.getProxyOpNoNamenodes());
        org.junit.jupiter.api.Assertions.assertTrue((successReadTime - firstLoadTime < this.cluster.getCacheFlushInterval() ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncCallerPoolMetrics() throws Exception {
        this.setupCluster(true, false);
        FederationTestUtils.simulateSlowNamenode(this.cluster.getCluster().getNameNode(0), 2);
        ObjectMapper objectMapper = new ObjectMapper();
        this.cluster.getRouters().remove(1);
        FederationRPCMetrics metrics = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        Map result = (Map)objectMapper.readValue(metrics.getAsyncCallerPool(), Map.class);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)((Integer)result.get("active")));
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)((Integer)result.get("total")));
        org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)((Integer)result.get("max")));
        ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            exec.submit(() -> {
                DFSClient routerClient = null;
                try {
                    routerClient = new DFSClient(new URI("hdfs://fed"), this.cluster.getRouterClientConf());
                    String clientName = routerClient.getClientName();
                    ClientProtocol routerProto = routerClient.getNamenode();
                    routerProto.renewLease(clientName, null);
                }
                catch (Exception e) {
                    org.junit.jupiter.api.Assertions.fail((String)("Client request failed: " + e));
                }
                finally {
                    if (routerClient != null) {
                        try {
                            routerClient.close();
                        }
                        catch (IOException e) {
                            LOG.error("Cannot close the client");
                        }
                    }
                }
            });
            GenericTestUtils.waitFor(() -> {
                try {
                    Map newResult = (Map)objectMapper.readValue(metrics.getAsyncCallerPool(), Map.class);
                    if ((Integer)newResult.get("active") != 1) {
                        return false;
                    }
                    if ((Integer)newResult.get("max") != 4) {
                        return false;
                    }
                    int total = (Integer)newResult.get("total");
                    return total >= 1 && total <= 4;
                }
                catch (Exception e) {
                    LOG.error("Not able to parse metrics result: " + e);
                    return false;
                }
            }, (long)100L, (long)2000L);
        }
        finally {
            exec.shutdown();
        }
    }
}

