/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherThread;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class TestRouterMountTableCacheRefresh {
    private static TestingServer curatorTestingServer;
    private static MiniRouterDFSCluster cluster;
    private static MiniRouterDFSCluster.RouterContext routerContext;
    private static MountTableManager mountTableManager;

    public static Collection<Object> data() {
        return Arrays.asList(true, false);
    }

    public void initTestRouterMountTableCacheRefresh(boolean pUseIpForHeartbeats) throws Exception {
        if (curatorTestingServer != null) {
            return;
        }
        curatorTestingServer = new TestingServer();
        curatorTestingServer.start();
        String connectString = curatorTestingServer.getConnectString();
        int numNameservices = 2;
        cluster = new MiniRouterDFSCluster(false, numNameservices);
        Configuration conf = new RouterConfigBuilder().refreshCache().admin().rpc().heartbeat().build();
        conf.setClass("dfs.federation.router.file.resolver.client.class", RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, FileSubclusterResolver.class);
        conf.set("dfs.federation.router.store.driver.zk.address", connectString);
        conf.setBoolean("dfs.federation.router.store.enable", true);
        conf.setBoolean("dfs.federation.router.heartbeat.with.ip.enable", pUseIpForHeartbeats);
        cluster.addRouterOverrides(conf);
        cluster.startCluster();
        cluster.startRouters();
        cluster.waitClusterUp();
        routerContext = cluster.getRandomRouter();
        RouterStore routerStateManager = routerContext.getRouter().getRouterStateManager();
        mountTableManager = routerContext.getAdminClient().getMountTableManager();
        FederationTestUtils.waitRouterRegistered(routerStateManager, numNameservices, 60000);
    }

    @AfterEach
    public void destroy() {
        try {
            if (curatorTestingServer != null) {
                curatorTestingServer.close();
            }
            if (cluster != null) {
                cluster.shutdown();
            }
        }
        catch (IOException iOException) {
        }
        finally {
            cluster = null;
            routerContext = null;
            mountTableManager = null;
            curatorTestingServer = null;
        }
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.clearEntries();
    }

    private void clearEntries() throws IOException {
        List<MountTable> result = this.getMountTableEntries();
        for (MountTable mountTable : result) {
            RemoveMountTableEntryResponse removeMountTableEntry = mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest.newInstance((String)mountTable.getSourcePath()));
            Assertions.assertTrue((boolean)removeMountTableEntry.getStatus());
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    public void testMountTableEntriesCacheUpdatedAfterAddAPICall(boolean pUseIpForHeartbeats) throws Exception {
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        int existingEntriesCount = this.getNumMountTableEntries();
        String srcPath = "/addPath";
        MountTable newEntry = MountTable.newInstance((String)srcPath, Collections.singletonMap("ns0", "/addPathDest"), (long)Time.now(), (long)Time.now());
        this.addMountTableEntry(mountTableManager, newEntry);
        List<MiniRouterDFSCluster.RouterContext> routers = this.getRouters();
        for (MiniRouterDFSCluster.RouterContext rc : routers) {
            List<MountTable> result = this.getMountTableEntries(rc.getAdminClient().getMountTableManager());
            Assertions.assertEquals((int)(1 + existingEntriesCount), (int)result.size());
            MountTable mountTableResult = result.get(0);
            Assertions.assertEquals((Object)srcPath, (Object)mountTableResult.getSourcePath());
        }
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    public void testMountTableEntriesCacheUpdatedAfterRemoveAPICall(boolean pUseIpForHeartbeats) throws Exception {
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        String srcPath = "/removePathSrc";
        MountTable newEntry = MountTable.newInstance((String)srcPath, Collections.singletonMap("ns0", "/removePathDest"), (long)Time.now(), (long)Time.now());
        this.addMountTableEntry(mountTableManager, newEntry);
        int addCount = this.getNumMountTableEntries();
        Assertions.assertEquals((int)1, (int)addCount);
        RemoveMountTableEntryResponse removeMountTableEntry = mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest.newInstance((String)srcPath));
        Assertions.assertTrue((boolean)removeMountTableEntry.getStatus());
        int removeCount = this.getNumMountTableEntries();
        Assertions.assertEquals((int)(addCount - 1), (int)removeCount);
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    public void testMountTableEntriesCacheUpdatedAfterUpdateAPICall(boolean pUseIpForHeartbeats) throws Exception {
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        String srcPath = "/updatePathSrc";
        MountTable newEntry = MountTable.newInstance((String)srcPath, Collections.singletonMap("ns0", "/updatePathDest"), (long)Time.now(), (long)Time.now());
        this.addMountTableEntry(mountTableManager, newEntry);
        int addCount = this.getNumMountTableEntries();
        Assertions.assertEquals((int)1, (int)addCount);
        String key = "ns1";
        String value = "/updatePathDest2";
        MountTable upateEntry = MountTable.newInstance((String)srcPath, Collections.singletonMap(key, value), (long)Time.now(), (long)Time.now());
        UpdateMountTableEntryResponse updateMountTableEntry = mountTableManager.updateMountTableEntry(UpdateMountTableEntryRequest.newInstance((MountTable)upateEntry));
        Assertions.assertTrue((boolean)updateMountTableEntry.getStatus());
        MountTable updatedMountTable = this.getMountTableEntry(srcPath);
        Assertions.assertNotNull((Object)updatedMountTable, (String)"Updated mount table entrty cannot be null");
        Assertions.assertEquals((int)1, (int)updatedMountTable.getDestinations().size());
        Assertions.assertEquals((Object)key, (Object)((RemoteLocation)updatedMountTable.getDestinations().get(0)).getNameserviceId());
        Assertions.assertEquals((Object)value, (Object)((RemoteLocation)updatedMountTable.getDestinations().get(0)).getDest());
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    public void testCachedRouterClientBehaviourAfterRouterStoped(boolean pUseIpForHeartbeats) throws Exception {
        List<MountTable> result;
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        String srcPath = "/addPathClientCache";
        MountTable newEntry = MountTable.newInstance((String)srcPath, Collections.singletonMap("ns0", "/addPathClientCacheDest"), (long)Time.now(), (long)Time.now());
        this.addMountTableEntry(mountTableManager, newEntry);
        List<MiniRouterDFSCluster.RouterContext> routers = this.getRouters();
        for (MiniRouterDFSCluster.RouterContext rc : routers) {
            result = this.getMountTableEntries(rc.getAdminClient().getMountTableManager());
            Assertions.assertEquals((int)1, (int)result.size());
            MountTable mountTableResult = result.get(0);
            Assertions.assertEquals((Object)srcPath, (Object)mountTableResult.getSourcePath());
        }
        for (MiniRouterDFSCluster.RouterContext rc : routers) {
            InetSocketAddress adminServerAddress = rc.getRouter().getAdminServerAddress();
            if (routerContext.getRouter().getAdminServerAddress().equals(adminServerAddress)) continue;
            cluster.stopRouter(rc);
            break;
        }
        srcPath = "/addPathClientCache2";
        newEntry = MountTable.newInstance((String)srcPath, Collections.singletonMap("ns0", "/addPathClientCacheDest2"), (long)Time.now(), (long)Time.now());
        this.addMountTableEntry(mountTableManager, newEntry);
        for (MiniRouterDFSCluster.RouterContext rc : this.getRouters()) {
            result = this.getMountTableEntries(rc.getAdminClient().getMountTableManager());
            Assertions.assertEquals((int)2, (int)result.size());
        }
    }

    private List<MiniRouterDFSCluster.RouterContext> getRouters() {
        ArrayList<MiniRouterDFSCluster.RouterContext> result = new ArrayList<MiniRouterDFSCluster.RouterContext>();
        for (MiniRouterDFSCluster.RouterContext rc : cluster.getRouters()) {
            if (rc.getRouter().getServiceState() != Service.STATE.STARTED) continue;
            result.add(rc);
        }
        return result;
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    public void testRefreshMountTableEntriesAPI(boolean pUseIpForHeartbeats) throws Exception {
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        RefreshMountTableEntriesRequest request = RefreshMountTableEntriesRequest.newInstance();
        RefreshMountTableEntriesResponse refreshMountTableEntriesRes = mountTableManager.refreshMountTableEntries(request);
        Assertions.assertTrue((boolean)refreshMountTableEntriesRes.getResult());
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    @Timeout(value=100L)
    public void testMountTableEntriesCacheUpdateTimeout(boolean pUseIpForHeartbeats) throws Exception {
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        MountTableRefresherService mountTableRefresherService = new MountTableRefresherService(routerContext.getRouter()){

            protected MountTableRefresherThread getLocalRefresher(String adminAddress) {
                return new MountTableRefresherThread(null, adminAddress){

                    public void run() {
                        try {
                            Thread.sleep(60000L);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                };
            }
        };
        Configuration config = routerContext.getRouter().getConfig();
        config.setTimeDuration("dfs.federation.router.mount-table.cache.update.timeout", 5L, TimeUnit.SECONDS);
        mountTableRefresherService.init(config);
        mountTableRefresherService.refresh();
    }

    @MethodSource(value={"data"})
    @ParameterizedTest
    public void testRouterClientConnectionExpiration(boolean pUseIpForHeartbeats) throws Exception {
        this.initTestRouterMountTableCacheRefresh(pUseIpForHeartbeats);
        final AtomicInteger createCounter = new AtomicInteger();
        final AtomicInteger removeCounter = new AtomicInteger();
        MountTableRefresherService mountTableRefresherService = new MountTableRefresherService(routerContext.getRouter()){

            protected void closeRouterClient(RouterClient client) {
                super.closeRouterClient(client);
                removeCounter.incrementAndGet();
            }

            protected RouterClient createRouterClient(InetSocketAddress routerSocket, Configuration config) throws IOException {
                createCounter.incrementAndGet();
                return super.createRouterClient(routerSocket, config);
            }
        };
        int clientCacheTime = 2000;
        Configuration config = routerContext.getRouter().getConfig();
        config.setTimeDuration("dfs.federation.router.mount-table.cache.update.client.max.time", (long)clientCacheTime, TimeUnit.MILLISECONDS);
        mountTableRefresherService.init(config);
        mountTableRefresherService.refresh();
        Assertions.assertNotEquals((int)0, (int)createCounter.get(), (String)"No RouterClient is created.");
        GenericTestUtils.waitFor(() -> createCounter.get() == removeCounter.get(), (long)100L, (long)(3 * clientCacheTime));
    }

    private int getNumMountTableEntries() throws IOException {
        List<MountTable> records = this.getMountTableEntries();
        int oldEntriesCount = records.size();
        return oldEntriesCount;
    }

    private MountTable getMountTableEntry(String srcPath) throws IOException {
        List<MountTable> mountTableEntries = this.getMountTableEntries();
        for (MountTable mountTable : mountTableEntries) {
            String sourcePath = mountTable.getSourcePath();
            if (!srcPath.equals(sourcePath)) continue;
            return mountTable;
        }
        return null;
    }

    private void addMountTableEntry(MountTableManager mountTableMgr, MountTable newEntry) throws IOException {
        AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance((MountTable)newEntry);
        AddMountTableEntryResponse addResponse = mountTableMgr.addMountTableEntry(addRequest);
        Assertions.assertTrue((boolean)addResponse.getStatus());
    }

    private List<MountTable> getMountTableEntries() throws IOException {
        return this.getMountTableEntries(mountTableManager);
    }

    private List<MountTable> getMountTableEntries(MountTableManager mountTableManagerParam) throws IOException {
        GetMountTableEntriesRequest request = GetMountTableEntriesRequest.newInstance((String)"/");
        return mountTableManagerParam.getMountTableEntries(request).getEntries();
    }
}

