/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Time;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestConsistentReadsObserver {
    public static final Logger LOG = LoggerFactory.getLogger((String)TestConsistentReadsObserver.class.getName());
    private static Configuration conf;
    private static MiniQJMHACluster qjmhaCluster;
    private static MiniDFSCluster dfsCluster;
    private DistributedFileSystem dfs;
    private final Path testPath = new Path("/TestConsistentReadsObserver");

    @BeforeAll
    public static void startUpCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.state.context.enabled", true);
        qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false);
        dfsCluster = qjmhaCluster.getDfsCluster();
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.dfs = this.setObserverRead(true);
    }

    @AfterEach
    public void cleanUp() throws IOException {
        this.dfs.delete(this.testPath, true);
    }

    @AfterAll
    public static void shutDownCluster() throws IOException {
        if (qjmhaCluster != null) {
            qjmhaCluster.shutdown();
        }
    }

    @Test
    public void testRequeueCall() throws Exception {
        int observerIdx = 2;
        NameNode nn = dfsCluster.getNameNode(2);
        int port = nn.getNameNodeAddress().getPort();
        Configuration originalConf = dfsCluster.getConfiguration(2);
        Configuration configuration = new Configuration(originalConf);
        String prefix = "ipc." + port + ".";
        configuration.set(prefix + "scheduler.impl", TestRpcScheduler.class.getName());
        configuration.setBoolean(prefix + "backoff.enable", true);
        NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
        Assertions.assertThat((long)NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0L);
        this.dfs.create(this.testPath, (short)1).close();
        this.assertSentTo(0);
        this.dfs.getFileStatus(this.testPath);
        this.assertSentTo(0);
        Assertions.assertThat((long)NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1L);
        NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
    }

    @Test
    public void testMsyncSimple() throws Exception {
        AtomicInteger readStatus = new AtomicInteger(0);
        this.dfs.getClient().getHAServiceState();
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        Thread reader = new Thread(() -> {
            try {
                this.dfs.getFileStatus(this.testPath);
                readStatus.set(1);
            }
            catch (IOException e) {
                e.printStackTrace();
                readStatus.set(-1);
            }
        });
        reader.start();
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)readStatus.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor(() -> readStatus.get() != 0, (long)100L, (long)10000L);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)readStatus.get());
    }

    private void testMsync(boolean autoMsync, long autoMsyncPeriodMs) throws Exception {
        AtomicInteger readStatus = new AtomicInteger(0);
        Configuration conf2 = new Configuration(conf);
        conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
        if (autoMsync) {
            conf2.setTimeDuration("dfs.client.failover.observer.auto-msync-period." + this.dfs.getUri().getHost(), autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
        }
        DistributedFileSystem dfs2 = (DistributedFileSystem)FileSystem.get((Configuration)conf2);
        this.dfs.getClient().getHAServiceState();
        dfs2.getClient().getHAServiceState();
        this.dfs.mkdir(new Path("/test"), FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        Thread reader = new Thread(() -> {
            try {
                if (!autoMsync) {
                    dfs2.msync();
                } else if (autoMsyncPeriodMs > 0L) {
                    Thread.sleep(autoMsyncPeriodMs);
                }
                dfs2.getFileStatus(this.testPath);
                if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
                    readStatus.set(1);
                } else {
                    readStatus.set(-1);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                readStatus.set(-1);
            }
        });
        reader.start();
        Thread.sleep(100L);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)readStatus.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor(() -> readStatus.get() != 0, (long)100L, (long)3000L);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)readStatus.get());
    }

    @Test
    public void testExplicitMsync() throws Exception {
        this.testMsync(false, -1L);
    }

    @Test
    public void testAutoMsyncPeriod0() throws Exception {
        this.testMsync(true, 0L);
    }

    @Test
    public void testAutoMsyncPeriod5() throws Exception {
        this.testMsync(true, 5L);
    }

    @Test
    public void testAutoMsyncLongPeriod() throws Exception {
        org.junit.jupiter.api.Assertions.assertThrows(TimeoutException.class, () -> this.testMsync(true, Long.MAX_VALUE));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCallFromNewClient() throws Exception {
        dfsCluster.transitionToStandby(0);
        dfsCluster.transitionToObserver(0);
        dfsCluster.transitionToStandby(2);
        dfsCluster.transitionToActive(2);
        try {
            AtomicInteger readStatus = new AtomicInteger(0);
            this.dfs.getClient().getHAServiceState();
            this.dfs.mkdir(new Path("/test"), FsPermission.getDefault());
            dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
            dfsCluster.getNameNode(0).getNamesystem().getEditLogTailer().doTailEdits();
            this.dfs.mkdir(this.testPath, FsPermission.getDefault());
            this.assertSentTo(2);
            Configuration conf2 = new Configuration(conf);
            conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
            DistributedFileSystem dfs2 = (DistributedFileSystem)FileSystem.get((Configuration)conf2);
            dfs2.getClient().getHAServiceState();
            Thread reader = new Thread(() -> {
                try {
                    dfs2.getFileStatus(this.testPath);
                    readStatus.set(1);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    readStatus.set(-1);
                }
            });
            reader.start();
            Thread.sleep(100L);
            org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)readStatus.get());
            dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
            dfsCluster.getNameNode(0).getNamesystem().getEditLogTailer().doTailEdits();
            GenericTestUtils.waitFor(() -> readStatus.get() != 0, (long)100L, (long)10000L);
            org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)readStatus.get());
        }
        finally {
            dfsCluster.transitionToStandby(2);
            dfsCluster.transitionToObserver(2);
            dfsCluster.transitionToStandby(0);
            dfsCluster.transitionToActive(0);
        }
    }

    @Test
    public void testUncoordinatedCall() throws Exception {
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        AtomicInteger readStatus = new AtomicInteger(0);
        Thread reader = new Thread(() -> {
            try {
                this.dfs.getClient().getFileInfo("/");
                readStatus.set(1);
                org.junit.jupiter.api.Assertions.fail((String)"Should have been interrupted before getting here.");
            }
            catch (IOException e) {
                e.printStackTrace();
                readStatus.set(-1);
            }
        });
        reader.start();
        long before = Time.now();
        this.dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
        long after = Time.now();
        org.junit.jupiter.api.Assertions.assertTrue((after - before < 200L ? 1 : 0) != 0);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)readStatus.get());
        Thread.sleep(5000L);
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)readStatus.get());
        org.junit.jupiter.api.Assertions.assertEquals((Object)((Object)Thread.State.WAITING), (Object)((Object)reader.getState()));
        reader.interrupt();
    }

    @Test
    public void testRequestFromNonObserverProxyProvider() throws Exception {
        Configuration conf2 = new Configuration(conf);
        HATestUtil.setFailoverConfigurations(conf2, HATestUtil.getLogicalHostname(dfsCluster), Collections.singletonList(dfsCluster.getNameNode(2).getNameNodeAddress()), ConfiguredFailoverProxyProvider.class);
        conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
        conf2.setInt("dfs.client.retry.max.attempts", 1);
        conf2.setInt("dfs.client.failover.max.attempts", 1);
        FileSystem dfs2 = FileSystem.get((Configuration)conf2);
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        try {
            dfs2.listStatus(this.testPath);
            org.junit.jupiter.api.Assertions.fail((String)"listStatus should have thrown exception");
        }
        catch (RemoteException re) {
            IOException e = re.unwrapRemoteException();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)(e instanceof StandbyException), (String)("should have thrown StandbyException but got " + e.getClass().getSimpleName()));
        }
    }

    @Test
    @Timeout(value=10L)
    public void testMsyncFileContext() throws Exception {
        NameNode nn0 = dfsCluster.getNameNode(0);
        NameNode nn2 = dfsCluster.getNameNode(2);
        HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
        org.junit.jupiter.api.Assertions.assertEquals((Object)HAServiceProtocol.HAServiceState.ACTIVE, (Object)st.getState(), (String)"nn0 is not active");
        st = nn2.getRpcServer().getServiceStatus();
        org.junit.jupiter.api.Assertions.assertEquals((Object)HAServiceProtocol.HAServiceState.OBSERVER, (Object)st.getState(), (String)"nn2 is not observer");
        FileContext fc = FileContext.getFileContext((Configuration)conf);
        fc.getFsStatus(this.testPath);
        Path p = new Path(this.testPath, "testMsyncFileContext");
        fc.mkdir(p, FsPermission.getDefault(), true);
        fc.msync();
        dfsCluster.rollEditLogAndTail(0);
        LOG.info("State id active = {}, Stat id observer = {}", (Object)nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(), (Object)nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
        try {
            fc.getFileStatus(p);
        }
        catch (FileNotFoundException e) {
            org.junit.jupiter.api.Assertions.fail((String)"File should exist on Observer after msync");
        }
    }

    @Test
    public void testRpcQueueTimeNumOpsMetrics() throws Exception {
        final AtomicInteger readStatus = new AtomicInteger(0);
        this.dfs.getClient().getHAServiceState();
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        Thread reader = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    TestConsistentReadsObserver.this.dfs.getFileStatus(TestConsistentReadsObserver.this.testPath);
                    readStatus.set(1);
                }
                catch (IOException e) {
                    e.printStackTrace();
                    readStatus.set(-1);
                }
            }
        });
        reader.start();
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)readStatus.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                return readStatus.get() != 0;
            }
        }, (long)100L, (long)10000L);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)readStatus.get());
        int observerIdx = 2;
        NameNode observerNN = dfsCluster.getNameNode(2);
        MetricsRecordBuilder rpcMetrics = MetricsAsserts.getMetrics((String)("RpcActivityForPort" + observerNN.getNameNodeAddress().getPort()));
        long rpcQueueTimeNumOps = MetricsAsserts.getLongCounter((String)"RpcQueueTimeNumOps", (MetricsRecordBuilder)rpcMetrics);
        long rpcProcessingTimeNumOps = MetricsAsserts.getLongCounter((String)"RpcProcessingTimeNumOps", (MetricsRecordBuilder)rpcMetrics);
        org.junit.jupiter.api.Assertions.assertEquals((long)rpcQueueTimeNumOps, (long)rpcProcessingTimeNumOps);
    }

    private void assertSentTo(int nnIdx) throws IOException {
        org.junit.jupiter.api.Assertions.assertTrue((boolean)HATestUtil.isSentToAnyOfNameNodes(this.dfs, dfsCluster, nnIdx), (String)("Request was not sent to the expected namenode " + nnIdx));
    }

    private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
        return HATestUtil.configureObserverReadFs(dfsCluster, conf, ObserverReadProxyProvider.class, flag);
    }

    public static class TestRpcScheduler
    implements RpcScheduler {
        private int allowed = 10;

        public int getPriorityLevel(Schedulable obj) {
            return 0;
        }

        public boolean shouldBackOff(Schedulable obj) {
            return --this.allowed < 0;
        }

        public void stop() {
        }
    }
}

