/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.fairness;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.fairness.ProportionRouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.fairness.StaticRouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRouterHandlersFairness {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterHandlersFairness.class);
    private StateStoreDFSCluster cluster;
    private Map<String, Integer> expectedHandlerPerNs;
    private Class<RouterRpcFairnessPolicyController> policyControllerClass;
    private int handlerCount;
    private Map<String, String> configuration;

    public void initTestRouterHandlersFairness(Class<RouterRpcFairnessPolicyController> pPolicyControllerClass, int pHandlerCount, Map<String, String> pConfiguration, Map<String, Integer> pExpectedHandlerPerNs) {
        this.expectedHandlerPerNs = pExpectedHandlerPerNs;
        this.policyControllerClass = pPolicyControllerClass;
        this.handlerCount = pHandlerCount;
        this.configuration = pConfiguration;
    }

    public static Collection primes() {
        return Arrays.asList({StaticRouterRpcFairnessPolicyController.class, 3, TestRouterHandlersFairness.setConfiguration(null), TestRouterHandlersFairness.expectedHandlerPerNs("ns0:1, ns1:1, concurrent:1")}, {ProportionRouterRpcFairnessPolicyController.class, 20, TestRouterHandlersFairness.setConfiguration("dfs.federation.router.fairness.handler.proportion.ns0=0.5, dfs.federation.router.fairness.handler.proportion.ns1=0.8, dfs.federation.router.fairness.handler.proportion.concurrent=1"), TestRouterHandlersFairness.expectedHandlerPerNs("ns0:10, ns1:16, concurrent:20")});
    }

    @AfterEach
    public void cleanup() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void setupCluster(boolean fairnessEnable, boolean ha) throws Exception {
        LOG.info("Test {}", (Object)this.policyControllerClass.getSimpleName());
        this.cluster = new StateStoreDFSCluster(ha, 2);
        Configuration routerConf = new RouterConfigBuilder().stateStore().rpc().build();
        if (fairnessEnable) {
            routerConf.setClass("dfs.federation.router.fairness.policy.controller.class", this.policyControllerClass, RouterRpcFairnessPolicyController.class);
        }
        routerConf.setTimeDuration("dfs.federation.router.fairness.acquire.timeout", 10L, TimeUnit.MILLISECONDS);
        routerConf.setInt("dfs.federation.router.handler.count", this.handlerCount);
        for (Map.Entry<String, String> conf : this.configuration.entrySet()) {
            routerConf.set(conf.getKey(), conf.getValue());
        }
        this.cluster.setNumDatanodesPerNameservice(0);
        this.cluster.addRouterOverrides(routerConf);
        this.cluster.startCluster();
        this.cluster.startRouters();
        this.cluster.waitClusterUp();
    }

    @MethodSource(value={"primes"})
    @ParameterizedTest
    public void testFairnessControlOff(Class<RouterRpcFairnessPolicyController> pPolicyControllerClass, int pHandlerCount, Map<String, String> pConfiguration, Map<String, Integer> pExpectedHandlerPerNs) throws Exception {
        this.initTestRouterHandlersFairness(pPolicyControllerClass, pHandlerCount, pConfiguration, pExpectedHandlerPerNs);
        this.setupCluster(false, false);
        this.startLoadTest(false);
    }

    @MethodSource(value={"primes"})
    @ParameterizedTest
    public void testFairnessControlOn(Class<RouterRpcFairnessPolicyController> pPolicyControllerClass, int pHandlerCount, Map<String, String> pConfiguration, Map<String, Integer> pExpectedHandlerPerNs) throws Exception {
        this.initTestRouterHandlersFairness(pPolicyControllerClass, pHandlerCount, pConfiguration, pExpectedHandlerPerNs);
        this.setupCluster(true, false);
        this.startLoadTest(true);
    }

    @MethodSource(value={"primes"})
    @ParameterizedTest
    public void testReleasedWhenExceptionOccurs(Class<RouterRpcFairnessPolicyController> pPolicyControllerClass, int pHandlerCount, Map<String, String> pConfiguration, Map<String, Integer> pExpectedHandlerPerNs) throws Exception {
        this.initTestRouterHandlersFairness(pPolicyControllerClass, pHandlerCount, pConfiguration, pExpectedHandlerPerNs);
        this.setupCluster(true, false);
        MiniRouterDFSCluster.RouterContext routerContext = this.cluster.getRandomRouter();
        RouterRpcClient rpcClient = routerContext.getRouter().getRpcServer().getRPCClient();
        ActiveNamenodeResolver mockNamenodeResolver = (ActiveNamenodeResolver)Mockito.mock(ActiveNamenodeResolver.class);
        Field field = rpcClient.getClass().getDeclaredField("namenodeResolver");
        field.setAccessible(true);
        field.set(rpcClient, mockNamenodeResolver);
        DFSClient client = routerContext.getClient();
        int availablePermits = rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
        LambdaTestUtils.intercept(IOException.class, () -> {
            LOG.info("Use getFileInfo test invokeSequential.");
            client.getFileInfo("/test.txt");
        });
        Assertions.assertEquals((int)availablePermits, (int)rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
        ArrayList<RemoteLocation> locations = new ArrayList<RemoteLocation>();
        locations.add(new RemoteLocation("ns0", "/", "/"));
        RemoteMethod renewLease = new RemoteMethod("renewLease", new Class[]{String.class, List.class}, new Object[]{null, null});
        availablePermits = rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
        LambdaTestUtils.intercept(IOException.class, () -> {
            LOG.info("Use renewLease test invokeConcurrent.");
            rpcClient.invokeConcurrent(locations, renewLease);
        });
        Assertions.assertEquals((int)availablePermits, (int)rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
    }

    private void startLoadTest(boolean fairness) throws Exception {
        this.startLoadTest(true, fairness);
        this.startLoadTest(false, fairness);
    }

    private void startLoadTest(boolean isConcurrent, boolean fairness) throws Exception {
        int i;
        MiniRouterDFSCluster.RouterContext routerContext = this.cluster.getRandomRouter();
        URI address = routerContext.getFileSystemURI();
        HdfsConfiguration conf = new HdfsConfiguration();
        int numOps = 10;
        AtomicInteger overloadException = new AtomicInteger();
        if (fairness) {
            if (isConcurrent) {
                LOG.info("Taking fanout lock first");
                for (int i2 = 0; i2 < this.expectedHandlerPerNs.get("concurrent"); ++i2) {
                    Assertions.assertTrue((boolean)routerContext.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().acquirePermit("concurrent"));
                }
            } else {
                for (String ns : this.cluster.getNameservices()) {
                    LOG.info("Taking lock first for ns: {}", (Object)ns);
                    for (i = 0; i < this.expectedHandlerPerNs.get(ns); ++i) {
                        Assertions.assertTrue((boolean)routerContext.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().acquirePermit(ns));
                    }
                }
            }
        }
        int originalRejectedPermits = this.getTotalRejectedPermits(routerContext);
        this.innerCalls(address, 10, isConcurrent, (Configuration)conf, overloadException);
        int latestRejectedPermits = this.getTotalRejectedPermits(routerContext);
        Assertions.assertEquals((int)(latestRejectedPermits - originalRejectedPermits), (int)overloadException.get());
        if (fairness) {
            Assertions.assertTrue((overloadException.get() > 0 ? 1 : 0) != 0);
            if (isConcurrent) {
                LOG.info("Release fanout lock that was taken before test");
                for (i = 0; i < this.expectedHandlerPerNs.get("concurrent"); ++i) {
                    routerContext.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().releasePermit("concurrent");
                }
            } else {
                for (String ns : this.cluster.getNameservices()) {
                    for (int i3 = 0; i3 < this.expectedHandlerPerNs.get(ns); ++i3) {
                        routerContext.getRouter().getRpcServer().getRPCClient().getRouterRpcFairnessPolicyController().releasePermit(ns);
                    }
                }
            }
        } else {
            Assertions.assertEquals((int)0, (int)overloadException.get(), (String)"Number of failed RPCs without fairness configured");
        }
        int originalAcceptedPermits = this.getTotalAcceptedPermits(routerContext);
        overloadException = new AtomicInteger();
        this.innerCalls(address, 10, isConcurrent, (Configuration)conf, overloadException);
        int latestAcceptedPermits = this.getTotalAcceptedPermits(routerContext);
        Assertions.assertEquals((int)(latestAcceptedPermits - originalAcceptedPermits), (int)10);
        Assertions.assertEquals((int)overloadException.get(), (int)0);
    }

    private void invokeSequential(ClientProtocol routerProto) throws IOException {
        routerProto.getFileInfo("/test.txt");
    }

    private void invokeConcurrent(ClientProtocol routerProto, String clientName) throws IOException {
        routerProto.renewLease(clientName, null);
    }

    private int getTotalRejectedPermits(MiniRouterDFSCluster.RouterContext routerContext) {
        int totalRejectedPermits = 0;
        for (String ns : this.cluster.getNameservices()) {
            totalRejectedPermits = (int)((long)totalRejectedPermits + routerContext.getRouterRpcClient().getRejectedPermitForNs(ns));
        }
        totalRejectedPermits = (int)((long)totalRejectedPermits + routerContext.getRouterRpcClient().getRejectedPermitForNs("concurrent"));
        return totalRejectedPermits;
    }

    private int getTotalAcceptedPermits(MiniRouterDFSCluster.RouterContext routerContext) {
        int totalAcceptedPermits = 0;
        for (String ns : this.cluster.getNameservices()) {
            totalAcceptedPermits = (int)((long)totalAcceptedPermits + routerContext.getRouterRpcClient().getAcceptedPermitForNs(ns));
        }
        totalAcceptedPermits = (int)((long)totalAcceptedPermits + routerContext.getRouterRpcClient().getAcceptedPermitForNs("concurrent"));
        return totalAcceptedPermits;
    }

    private void innerCalls(URI address, int numOps, boolean isConcurrent, Configuration conf, AtomicInteger overloadException) throws IOException {
        for (int i = 0; i < numOps; ++i) {
            DFSClient routerClient = null;
            try {
                routerClient = new DFSClient(address, conf);
                String clientName = routerClient.getClientName();
                ClientProtocol routerProto = routerClient.getNamenode();
                if (isConcurrent) {
                    this.invokeConcurrent(routerProto, clientName);
                } else {
                    this.invokeSequential(routerProto);
                }
            }
            catch (RemoteException re) {
                IOException ioe = re.unwrapRemoteException();
                Assertions.assertTrue((boolean)(ioe instanceof StandbyException), (String)("Wrong exception: " + ioe));
                GenericTestUtils.assertExceptionContains((String)"is overloaded for NS", (Throwable)ioe);
                overloadException.incrementAndGet();
            }
            catch (Throwable e) {
                throw e;
            }
            finally {
                if (routerClient != null) {
                    try {
                        routerClient.close();
                    }
                    catch (IOException e) {
                        LOG.error("Cannot close the client");
                    }
                }
            }
            overloadException.get();
        }
    }

    private static Map<String, Integer> expectedHandlerPerNs(String str) {
        String[] tmpStrs;
        HashMap<String, Integer> handlersPerNsMap = new HashMap<String, Integer>();
        if (str == null) {
            return handlersPerNsMap;
        }
        for (String tmpStr : tmpStrs = str.split(", ")) {
            String[] handlersPerNs = tmpStr.split(":");
            if (handlersPerNs.length != 2) continue;
            handlersPerNsMap.put(handlersPerNs[0], Integer.valueOf(handlersPerNs[1]));
        }
        return handlersPerNsMap;
    }

    private static Map<String, String> setConfiguration(String str) {
        String[] tmpStrs;
        HashMap<String, String> conf = new HashMap<String, String>();
        if (str == null) {
            return conf;
        }
        for (String tmpStr : tmpStrs = str.split(", ")) {
            String[] configKV = tmpStr.split("=");
            if (configKV.length != 2) continue;
            conf.put(configKV[0], configKV[1]);
        }
        return conf;
    }
}

