/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRouterFaultTolerant {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterFaultTolerant.class);
    private static final int NUM_FILES = 10;
    private static final int NUM_ROUTERS = 2;
    private Map<String, MockNamenode> namenodes = new HashMap<String, MockNamenode>();
    private List<Router> routers = new ArrayList<Router>();
    private ExecutorService service;

    @BeforeEach
    public void setup() throws Exception {
        LOG.info("Start the Namenodes");
        HdfsConfiguration nnConf = new HdfsConfiguration();
        nnConf.setInt("dfs.namenode.handler.count", 10);
        for (String nsId : Arrays.asList("ns0", "ns1")) {
            MockNamenode nn = new MockNamenode(nsId, (Configuration)nnConf);
            nn.transitionToActive();
            nn.addFileSystemMock();
            this.namenodes.put(nsId, nn);
        }
        LOG.info("Start the Routers");
        Configuration routerConf = new RouterConfigBuilder().stateStore().admin().rpc().build();
        routerConf.set("dfs.federation.router.rpc-address", "0.0.0.0:0");
        routerConf.set("dfs.federation.router.http-address", "0.0.0.0:0");
        routerConf.set("dfs.federation.router.admin-address", "0.0.0.0:0");
        routerConf.setTimeDuration("dfs.federation.router.connect.timeout", 500L, TimeUnit.MILLISECONDS);
        Configuration stateStoreConf = FederationStateStoreTestUtils.getStateStoreConfiguration();
        stateStoreConf.setClass("dfs.federation.router.namenode.resolver.client.class", MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
        stateStoreConf.setClass("dfs.federation.router.file.resolver.client.class", MultipleDestinationMountTableResolver.class, FileSubclusterResolver.class);
        routerConf.addResource(stateStoreConf);
        for (int i = 0; i < 2; ++i) {
            routerConf.setBoolean("dfs.federation.router.client.allow-partial-listing", i != 0);
            Router router = new Router();
            router.init(routerConf);
            router.start();
            this.routers.add(router);
        }
        LOG.info("Registering the subclusters in the Routers");
        MockNamenode.registerSubclusters(this.routers, this.namenodes.values(), Collections.singleton("ns1"));
        this.service = Executors.newFixedThreadPool(10);
    }

    @AfterEach
    public void cleanup() throws Exception {
        LOG.info("Stopping the cluster");
        for (MockNamenode nn : this.namenodes.values()) {
            nn.stop();
        }
        this.namenodes.clear();
        this.routers.forEach(AbstractService::stop);
        this.routers.clear();
        if (this.service != null) {
            this.service.shutdown();
            this.service = null;
        }
    }

    private void updateMountPointFaultTolerant(String mountPoint) throws IOException {
        Router router = this.getRandomRouter();
        RouterClient admin = FederationTestUtils.getAdminClient(router);
        MountTableManager mountTable = admin.getMountTableManager();
        GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest.newInstance((String)mountPoint);
        GetMountTableEntriesResponse entries = mountTable.getMountTableEntries(getRequest);
        MountTable updateEntry = (MountTable)entries.getEntries().get(0);
        updateEntry.setFaultTolerant(true);
        UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest.newInstance((MountTable)updateEntry);
        UpdateMountTableEntryResponse updateResponse = mountTable.updateMountTableEntry(updateRequest);
        Assertions.assertTrue((boolean)updateResponse.getStatus());
        FederationTestUtils.refreshRoutersCaches(this.routers);
    }

    @Test
    public void testWriteWithFailedSubcluster() throws Exception {
        LOG.info("Stop ns1 to simulate an unavailable subcluster");
        this.namenodes.get("ns1").stop();
        ArrayList<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
        List<DestinationOrder> orders = Arrays.asList(DestinationOrder.HASH_ALL, DestinationOrder.SPACE, DestinationOrder.RANDOM, DestinationOrder.HASH);
        for (DestinationOrder order : orders) {
            tasks.add(() -> {
                this.testWriteWithFailedSubcluster(order);
                return true;
            });
        }
        TaskResults results = this.collectResults("Full tests", tasks);
        Assertions.assertEquals((int)orders.size(), (int)results.getSuccess());
    }

    private void testWriteWithFailedSubcluster(DestinationOrder order) throws Exception {
        FileSystem router0Fs = FederationTestUtils.getFileSystem(this.routers.get(0));
        FileSystem router1Fs = FederationTestUtils.getFileSystem(this.routers.get(1));
        FileSystem ns0Fs = FederationTestUtils.getFileSystem(this.namenodes.get("ns0").getRPCPort());
        String mountPoint = "/" + order + "-failsubcluster";
        Path mountPath = new Path(mountPoint);
        LOG.info("Setup {} with order {}", (Object)mountPoint, (Object)order);
        FederationTestUtils.createMountTableEntry(this.getRandomRouter(), mountPoint, order, this.namenodes.keySet());
        FederationTestUtils.refreshRoutersCaches(this.routers);
        LOG.info("Write in {} should succeed writing in ns0 and fail for ns1", (Object)mountPath);
        this.checkDirectoriesFaultTolerant(mountPath, order, router0Fs, router1Fs, ns0Fs, false);
        this.checkFilesFaultTolerant(mountPath, order, router0Fs, router1Fs, ns0Fs, false);
        LOG.info("Make {} fault tolerant and everything succeeds", (Object)mountPath);
        IOException ioe = null;
        try {
            this.updateMountPointFaultTolerant(mountPoint);
        }
        catch (IOException e) {
            ioe = e;
        }
        if (DestinationOrder.FOLDER_ALL.contains(order)) {
            Assertions.assertNull((Object)ioe);
            this.checkDirectoriesFaultTolerant(mountPath, order, router0Fs, router1Fs, ns0Fs, true);
            this.checkFilesFaultTolerant(mountPath, order, router0Fs, router1Fs, ns0Fs, true);
        } else {
            Assertions.assertTrue((boolean)ioe.getMessage().startsWith("Invalid entry, fault tolerance only supported for ALL order"));
        }
    }

    private void checkDirectoriesFaultTolerant(Path mountPoint, DestinationOrder order, FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs, boolean faultTolerant) throws Exception {
        FileStatus[] dirs0 = this.listStatus(router1Fs, mountPoint);
        LOG.info("Create directories in {}", (Object)mountPoint);
        ArrayList<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
        for (int i = 0; i < 10; ++i) {
            Path dir = new Path(mountPoint, String.format("dir-%s-%03d", faultTolerant, i));
            FileSystem fs = this.getRandomRouterFileSystem();
            tasks.add(TestRouterFaultTolerant.getDirCreateTask(fs, dir));
        }
        TaskResults results = this.collectResults("Create dir " + mountPoint, tasks);
        LOG.info("Check directories results for {}: {}", (Object)mountPoint, (Object)results);
        if (faultTolerant || DestinationOrder.FOLDER_ALL.contains(order)) {
            Assertions.assertEquals((int)10, (int)results.getSuccess());
            Assertions.assertEquals((int)0, (int)results.getFailure());
        } else {
            TestRouterFaultTolerant.assertBothResults("check dir " + mountPoint, 10, results);
        }
        LOG.info("Check directories listing for {}", (Object)mountPoint);
        tasks.add(TestRouterFaultTolerant.getListFailTask(router0Fs, mountPoint));
        int filesExpected = dirs0.length + results.getSuccess();
        tasks.add(TestRouterFaultTolerant.getListSuccessTask(router1Fs, mountPoint, filesExpected));
        results = this.collectResults("List " + mountPoint, tasks);
        Assertions.assertEquals((int)2, (int)results.getSuccess(), (String)"Failed listing");
        tasks.add(TestRouterFaultTolerant.getContentSummaryFailTask(router0Fs, mountPoint));
        tasks.add(TestRouterFaultTolerant.getContentSummarySuccessTask(router1Fs, mountPoint, filesExpected));
        results = this.collectResults("Content summary " + mountPoint, tasks);
        Assertions.assertEquals((int)2, (int)results.getSuccess(), (String)"Failed content summary");
    }

    private void checkFilesFaultTolerant(Path mountPoint, DestinationOrder order, FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs, boolean faultTolerant) throws Exception {
        FileStatus[] dirs0 = this.listStatus(router1Fs, mountPoint);
        Path dir0 = Path.getPathWithoutSchemeAndAuthority((Path)dirs0[0].getPath());
        LOG.info("Create files in {}", (Object)dir0);
        ArrayList<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
        for (int i = 0; i < 10; ++i) {
            String newFile = String.format("%s/file-%03d.txt", dir0, i);
            FileSystem fs = this.getRandomRouterFileSystem();
            tasks.add(TestRouterFaultTolerant.getFileCreateTask(fs, newFile, ns0Fs));
        }
        TaskResults results = this.collectResults("Create file " + dir0, tasks);
        LOG.info("Check files results for {}: {}", (Object)dir0, (Object)results);
        if (faultTolerant) {
            Assertions.assertEquals((int)10, (int)results.getSuccess(), (String)("Not enough success in " + mountPoint));
            Assertions.assertEquals((int)0, (int)results.getFailure(), (String)("Nothing should fail in " + mountPoint));
        } else {
            Assertions.assertEquals((int)0, (int)results.getSuccess(), (String)("Nothing should succeed in " + mountPoint));
            Assertions.assertEquals((int)10, (int)results.getFailure(), (String)("Everything should fail in " + mountPoint));
        }
        LOG.info("Check files listing for {}", (Object)dir0);
        tasks.add(TestRouterFaultTolerant.getListFailTask(router0Fs, dir0));
        tasks.add(TestRouterFaultTolerant.getListSuccessTask(router1Fs, dir0, results.getSuccess()));
        Assertions.assertEquals((int)2, (int)this.collectResults("List " + dir0, tasks).getSuccess());
        tasks.add(TestRouterFaultTolerant.getContentSummaryFailTask(router0Fs, dir0));
        tasks.add(TestRouterFaultTolerant.getContentSummarySuccessTask(router1Fs, dir0, results.getSuccess()));
        results = this.collectResults("Content summary " + dir0, tasks);
        Assertions.assertEquals((int)2, (int)results.getSuccess());
    }

    private static String toString(FileStatus[] files) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        for (FileStatus file : files) {
            if (sb.length() > 1) {
                sb.append(", ");
            }
            sb.append(Path.getPathWithoutSchemeAndAuthority((Path)file.getPath()));
        }
        sb.append("]");
        return sb.toString();
    }

    private FileStatus[] listStatus(FileSystem fs, Path path) throws IOException {
        FileStatus[] files = new FileStatus[]{};
        try {
            files = fs.listStatus(path);
        }
        catch (FileNotFoundException fnfe) {
            LOG.debug("File not found: {}", (Object)fnfe.getMessage());
        }
        return files;
    }

    private static Callable<Boolean> getFileCreateTask(FileSystem fs, String file, FileSystem checkFs) {
        return () -> {
            try {
                Path path = new Path(file);
                FSDataOutputStream os = fs.create(path);
                os.close();
                FileStatus fileStatus = checkFs.getFileStatus(path);
                Assertions.assertTrue((fileStatus.getLen() > 0L ? 1 : 0) != 0, (String)("File not created properly: " + fileStatus));
                return true;
            }
            catch (RemoteException re) {
                return false;
            }
        };
    }

    private static Callable<Boolean> getDirCreateTask(FileSystem fs, Path dir) {
        return () -> {
            try {
                fs.mkdirs(dir);
                return true;
            }
            catch (RemoteException re) {
                return false;
            }
        };
    }

    private static Callable<Boolean> getListFailTask(FileSystem fs, Path path) {
        return () -> {
            try {
                fs.listStatus(path);
                return false;
            }
            catch (RemoteException re) {
                return true;
            }
        };
    }

    private static Callable<Boolean> getListSuccessTask(FileSystem fs, Path path, int expected) {
        return () -> {
            FileStatus[] dirs = fs.listStatus(path);
            Assertions.assertEquals((int)expected, (int)dirs.length, (String)TestRouterFaultTolerant.toString(dirs));
            return true;
        };
    }

    private static Callable<Boolean> getContentSummaryFailTask(FileSystem fs, Path path) {
        return () -> {
            try {
                fs.getContentSummary(path);
                return false;
            }
            catch (RemoteException re) {
                return true;
            }
        };
    }

    private static Callable<Boolean> getContentSummarySuccessTask(FileSystem fs, Path path, int expected) {
        return () -> {
            ContentSummary summary = fs.getContentSummary(path);
            Assertions.assertEquals((long)expected, (long)summary.getFileAndDirectoryCount(), (String)("Wrong summary for " + path));
            return true;
        };
    }

    private TaskResults collectResults(String tag, Collection<Callable<Boolean>> tasks) throws Exception {
        TaskResults results = new TaskResults();
        this.service.invokeAll(tasks).forEach(task -> {
            try {
                boolean succeeded = (Boolean)task.get();
                if (succeeded) {
                    LOG.info("Got success for {}", (Object)tag);
                    results.incrSuccess();
                } else {
                    LOG.info("Got failure for {}", (Object)tag);
                    results.incrFailure();
                }
            }
            catch (Exception e) {
                StringWriter stackTrace = new StringWriter();
                PrintWriter writer = new PrintWriter(stackTrace);
                if (e instanceof ExecutionException) {
                    e.getCause().printStackTrace(writer);
                } else {
                    e.printStackTrace(writer);
                }
                Assertions.fail((String)("Failed to run \"" + tag + "\": " + stackTrace));
            }
        });
        tasks.clear();
        return results;
    }

    private static void assertBothResults(String msg, int expected, TaskResults actual) {
        Assertions.assertEquals((int)expected, (int)actual.getTotal(), (String)msg);
        Assertions.assertTrue((actual.getSuccess() > 0 ? 1 : 0) != 0, (String)("Expected some success for " + msg));
        Assertions.assertTrue((actual.getFailure() > 0 ? 1 : 0) != 0, (String)("Expected some failure for " + msg));
    }

    private Router getRandomRouter() {
        Random rnd = new Random();
        int index = rnd.nextInt(this.routers.size());
        return this.routers.get(index);
    }

    private FileSystem getRandomRouterFileSystem() throws Exception {
        UserGroupInformation userUgi = UserGroupInformation.createUserForTesting((String)("user-" + UUID.randomUUID()), (String[])new String[]{"group"});
        Router router = this.getRandomRouter();
        return (FileSystem)userUgi.doAs(() -> FederationTestUtils.getFileSystem(router));
    }

    @Test
    public void testReadWithFailedSubcluster() throws Exception {
        DestinationOrder order = DestinationOrder.HASH_ALL;
        String mountPoint = "/" + order + "-testread";
        Path mountPath = new Path(mountPoint);
        LOG.info("Setup {} with order {}", (Object)mountPoint, (Object)order);
        FederationTestUtils.createMountTableEntry(this.routers, mountPoint, order, this.namenodes.keySet());
        FileSystem fs = this.getRandomRouterFileSystem();
        Path fileexisting = new Path(mountPath, "fileexisting");
        Path filenotexisting = new Path(mountPath, "filenotexisting");
        FSDataOutputStream os = fs.create(fileexisting);
        Assertions.assertNotNull((Object)os);
        os.close();
        FSDataInputStream fsdis = fs.open(fileexisting);
        Assertions.assertNotNull((Object)fsdis, (String)"We should be able to read the file");
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> fs.open(filenotexisting));
        String nsIdWithFile = null;
        for (Map.Entry<String, MockNamenode> entry : this.namenodes.entrySet()) {
            String nsId = entry.getKey();
            MockNamenode nn = entry.getValue();
            int rpc = nn.getRPCPort();
            FileSystem nnfs = FederationTestUtils.getFileSystem(rpc);
            try {
                FileStatus fileStatus = nnfs.getFileStatus(fileexisting);
                Assertions.assertNotNull((Object)fileStatus);
                Assertions.assertNull(nsIdWithFile, (String)"The file cannot be in two subclusters");
                nsIdWithFile = nsId;
            }
            catch (FileNotFoundException fnfe) {
                LOG.debug("File not found in {}", (Object)nsId);
            }
        }
        Assertions.assertNotNull(nsIdWithFile, (String)"The file has to be in one subcluster");
        LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile);
        this.namenodes.get(nsIdWithFile).stop();
        try {
            fs.open(fileexisting);
            Assertions.fail((String)"It should throw an unavailable cluster exception");
        }
        catch (RemoteException re) {
            IOException ioe = re.unwrapRemoteException();
            Assertions.assertTrue((boolean)RouterRpcClient.isUnavailableException((IOException)ioe), (String)("Expected an unavailable exception for:" + ioe.getClass()));
        }
    }

    static class TaskResults {
        private final AtomicInteger success = new AtomicInteger(0);
        private final AtomicInteger failure = new AtomicInteger(0);

        TaskResults() {
        }

        public void incrSuccess() {
            this.success.incrementAndGet();
        }

        public void incrFailure() {
            this.failure.incrementAndGet();
        }

        public int getSuccess() {
            return this.success.get();
        }

        public int getFailure() {
            return this.failure.get();
        }

        public int getTotal() {
            return this.success.get() + this.failure.get();
        }

        public String toString() {
            return "Success=" + this.getSuccess() + " Failure=" + this.getFailure();
        }
    }
}

