package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-hdfs-rbf-2.10.1-ODI-tests.jar:org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.class
  input_file:test-classes/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.class
 */
/* loaded from: input_file:hadoop-hdfs-rbf-2.10.1-ODI/share/hadoop/hdfs/hadoop-hdfs-rbf-2.10.1-ODI-tests.jar:org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.class */
public class TestRouterClientRejectOverload {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
    private StateStoreDFSCluster cluster;

    @After
    public void cleanup() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void setupCluster(boolean z) throws Exception {
        this.cluster = new StateStoreDFSCluster(false, 2);
        Configuration build = new RouterConfigBuilder().stateStore().metrics().admin().rpc().build();
        build.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
        build.setBoolean(RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, z);
        this.cluster.setNumDatanodesPerNameservice(0);
        this.cluster.addRouterOverrides(build);
        this.cluster.startCluster();
        this.cluster.startRouters();
        this.cluster.waitClusterUp();
    }

    @Test
    public void testWithoutOverloadControl() throws Exception {
        setupCluster(false);
        testOverloaded(0);
        FederationTestUtils.simulateSlowNamenode(this.cluster.getCluster().getNameNode(0), 1);
        testOverloaded(0);
        Iterator<MiniRouterDFSCluster.RouterContext> it = this.cluster.getRouters().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0L, it.next().getRouter().getRpcServer().getRPCMetrics().getProxyOpFailureClientOverloaded());
        }
    }

    @Test
    public void testOverloadControl() throws Exception {
        setupCluster(true);
        List<MiniRouterDFSCluster.RouterContext> routers = this.cluster.getRouters();
        FederationRPCMetrics rPCMetrics = routers.get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rPCMetrics2 = routers.get(1).getRouter().getRpcServer().getRPCMetrics();
        testOverloaded(0);
        Assert.assertEquals(0L, rPCMetrics.getProxyOpFailureClientOverloaded());
        Assert.assertEquals(0L, rPCMetrics2.getProxyOpFailureClientOverloaded());
        FederationTestUtils.simulateSlowNamenode(this.cluster.getCluster().getNameNode(0), 1);
        testOverloaded(4, 6);
        Assert.assertTrue(rPCMetrics.getProxyOpFailureClientOverloaded() + rPCMetrics2.getProxyOpFailureClientOverloaded() >= 4);
        Configuration routerClientConf = this.cluster.getRouterClientConf();
        long proxyOps = rPCMetrics.getProxyOps();
        long proxyOps2 = rPCMetrics2.getProxyOps();
        testOverloaded(0, 0, new URI("hdfs://fed/"), routerClientConf, 10);
        long proxyOps3 = rPCMetrics.getProxyOps() - proxyOps;
        long proxyOps4 = rPCMetrics2.getProxyOps() - proxyOps2;
        Assert.assertEquals(20L, proxyOps3 + proxyOps4);
        Assert.assertTrue(proxyOps3 + " operations: not distributed", proxyOps3 >= 8);
        Assert.assertTrue(proxyOps4 + " operations: not distributed", proxyOps4 >= 8);
    }

    private void testOverloaded(int i) throws Exception {
        testOverloaded(i, i);
    }

    private void testOverloaded(int i, int i2) throws Exception {
        testOverloaded(i, i2, this.cluster.getRandomRouter().getFileSystemURI(), new HdfsConfiguration(), 10);
    }

    private void testOverloaded(int i, int i2, final URI uri, final Configuration configuration, int i3) throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i3);
        ArrayList arrayList = new ArrayList();
        for (int i4 = 0; i4 < i3; i4++) {
            final int i5 = i4 * 50;
            arrayList.add(newFixedThreadPool.submit(new Runnable() { // from class: org.apache.hadoop.hdfs.server.federation.router.TestRouterClientRejectOverload.1
                @Override // java.lang.Runnable
                public void run() {
                    DFSClient dFSClient = null;
                    try {
                        try {
                            try {
                                try {
                                    Thread.sleep(i5);
                                    dFSClient = new DFSClient(uri, configuration);
                                    dFSClient.getNamenode().renewLease(dFSClient.getClientName());
                                    if (dFSClient != null) {
                                        try {
                                            dFSClient.close();
                                        } catch (IOException e) {
                                            TestRouterClientRejectOverload.LOG.error("Cannot close the client");
                                        }
                                    }
                                } catch (InterruptedException e2) {
                                    Assert.fail("Cannot sleep: " + e2);
                                    if (dFSClient != null) {
                                        try {
                                            dFSClient.close();
                                        } catch (IOException e3) {
                                            TestRouterClientRejectOverload.LOG.error("Cannot close the client");
                                        }
                                    }
                                }
                            } catch (RemoteException e4) {
                                IOException unwrapRemoteException = e4.unwrapRemoteException();
                                Assert.assertTrue("Wrong exception: " + unwrapRemoteException, unwrapRemoteException instanceof StandbyException);
                                GenericTestUtils.assertExceptionContains("is overloaded", unwrapRemoteException);
                                atomicInteger.incrementAndGet();
                                if (dFSClient != null) {
                                    try {
                                        dFSClient.close();
                                    } catch (IOException e5) {
                                        TestRouterClientRejectOverload.LOG.error("Cannot close the client");
                                    }
                                }
                            }
                        } catch (Throwable th) {
                            if (dFSClient != null) {
                                try {
                                    dFSClient.close();
                                } catch (IOException e6) {
                                    TestRouterClientRejectOverload.LOG.error("Cannot close the client");
                                }
                            }
                            throw th;
                        }
                    } catch (IOException e7) {
                        Assert.fail("Unexpected exception: " + e7);
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e8) {
                                TestRouterClientRejectOverload.LOG.error("Cannot close the client");
                            }
                        }
                    }
                }
            }));
        }
        while (!arrayList.isEmpty()) {
            ((Future) arrayList.remove(0)).get();
        }
        newFixedThreadPool.shutdown();
        int i6 = atomicInteger.get();
        if (i == i2) {
            Assert.assertEquals(i, i6);
        } else {
            Assert.assertTrue("Expected >=" + i + " but was " + i6, i6 >= i);
            Assert.assertTrue("Expected <=" + i2 + " but was " + i6, i6 <= i2);
        }
    }
}
