/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.client.impl;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=180L)
public class TestBlockReaderFactory {
    static final Logger LOG = LoggerFactory.getLogger(TestBlockReaderFactory.class);

    @BeforeEach
    public void init() {
        DomainSocket.disableBindPathValidation();
        Assumptions.assumeTrue((DomainSocket.getLoadingFailureReason() == null ? 1 : 0) != 0);
    }

    @AfterEach
    public void cleanup() {
        DFSInputStream.tcpReadsDisabledForTesting = false;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
    }

    public static Configuration createShortCircuitConf(String testName, TemporarySocketDirectory sockDir) {
        Configuration conf = new Configuration();
        conf.set("dfs.client.context", testName);
        conf.setLong("dfs.blocksize", 4096L);
        conf.set("dfs.domain.socket.path", new File(sockDir.getDir(), testName + "._PORT").getAbsolutePath());
        conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
        conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
        conf.setBoolean("dfs.client.domain.socket.data.traffic", false);
        return conf;
    }

    @Test
    @Timeout(value=60L)
    public void testFallbackFromShortCircuitToUnixDomainTraffic() throws Exception {
        DFSInputStream.tcpReadsDisabledForTesting = true;
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration clientConf = TestBlockReaderFactory.createShortCircuitConf("testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
        clientConf.set("dfs.client.context", "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
        clientConf.setBoolean("dfs.client.domain.socket.data.traffic", true);
        Configuration serverConf = new Configuration(clientConf);
        serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
        cluster.waitActive();
        FileSystem dfs = FileSystem.get((URI)cluster.getURI(0), (Configuration)clientConf);
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 8193;
        int SEED = 1027565;
        DFSTestUtil.createFile(dfs, new Path(TEST_FILE), 8193L, (short)1, 1027565L);
        byte[] contents = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
        byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027565L, 8193);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)Arrays.equals(contents, expected));
        cluster.shutdown();
        sockDir.close();
    }

    @Test
    @Timeout(value=60L)
    public void testGetPathInfoWithUnresolvedHost() throws Exception {
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestBlockReaderFactory.createShortCircuitConf("testGetPathInfoWithUnresolvedHost", sockDir);
        conf.set("dfs.client.context", "testGetPathInfoWithUnresolvedHost_Context");
        conf.setBoolean("dfs.client.domain.socket.data.traffic", true);
        DfsClientConf.ShortCircuitConf shortCircuitConf = new DfsClientConf.ShortCircuitConf(conf);
        DomainSocketFactory domainSocketFactory = new DomainSocketFactory(shortCircuitConf);
        InetSocketAddress targetAddr = InetSocketAddress.createUnresolved("random", 32456);
        IOException exception = (IOException)org.junit.jupiter.api.Assertions.assertThrows(IOException.class, () -> {
            domainSocketFactory.getPathInfo(targetAddr, shortCircuitConf);
            sockDir.close();
        });
        org.junit.jupiter.api.Assertions.assertTrue((boolean)exception.getMessage().contains("Unresolved host: " + targetAddr));
    }

    @Test
    @Timeout(value=60L)
    public void testMultipleWaitersOnShortCircuitCache() throws Exception {
        int i;
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
        final AtomicBoolean testFailed = new AtomicBoolean(false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch);
                if (!creationIsBlocked.compareAndSet(true, false)) {
                    org.junit.jupiter.api.Assertions.fail((String)"there were multiple calls to createShortCircuitReplicaInfo.  Only one was expected.");
                }
                return null;
            }
        };
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestBlockReaderFactory.createShortCircuitConf("testMultipleWaitersOnShortCircuitCache", sockDir);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        final DistributedFileSystem dfs = cluster.getFileSystem();
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 4000;
        int SEED = 1027565;
        int NUM_THREADS = 10;
        DFSTestUtil.createFile((FileSystem)dfs, new Path("/test_file"), 4000L, (short)1, 1027565L);
        Runnable readerRunnable = new Runnable(){

            @Override
            public void run() {
                try {
                    byte[] contents = DFSTestUtil.readFileBuffer((FileSystem)dfs, new Path("/test_file"));
                    org.junit.jupiter.api.Assertions.assertFalse((boolean)creationIsBlocked.get());
                    byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027565L, 4000);
                    org.junit.jupiter.api.Assertions.assertTrue((boolean)Arrays.equals(contents, expected));
                }
                catch (Throwable e) {
                    LOG.error("readerRunnable error", e);
                    testFailed.set(true);
                }
            }
        };
        Thread[] threads = new Thread[10];
        for (i = 0; i < 10; ++i) {
            threads[i] = new Thread(readerRunnable);
            threads[i].start();
        }
        Thread.sleep(500L);
        latch.countDown();
        for (i = 0; i < 10; ++i) {
            Uninterruptibles.joinUninterruptibly((Thread)threads[i]);
        }
        cluster.shutdown();
        sockDir.close();
        org.junit.jupiter.api.Assertions.assertFalse((boolean)testFailed.get());
    }

    @Test
    public void testShortCircuitCacheTemporaryFailure() throws Exception {
        int i;
        BlockReaderTestUtil.enableBlockReaderFactoryTracing();
        final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true);
        final AtomicBoolean testFailed = new AtomicBoolean(false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                if (replicaCreationShouldFail.get()) {
                    Uninterruptibles.sleepUninterruptibly((long)2L, (TimeUnit)TimeUnit.SECONDS);
                    return new ShortCircuitReplicaInfo();
                }
                return null;
            }
        };
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestBlockReaderFactory.createShortCircuitConf("testShortCircuitCacheTemporaryFailure", sockDir);
        final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem dfs = cluster.getFileSystem();
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 4000;
        int NUM_THREADS = 2;
        int SEED = 1027565;
        final CountDownLatch gotFailureLatch = new CountDownLatch(2);
        final CountDownLatch shouldRetryLatch = new CountDownLatch(1);
        DFSTestUtil.createFile((FileSystem)dfs, new Path("/test_file"), 4000L, (short)1, 1027565L);
        Runnable readerRunnable = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    List locatedBlocks = cluster.getNameNode().getRpcServer().getBlockLocations("/test_file", 0L, 4000L).getLocatedBlocks();
                    LocatedBlock lblock = (LocatedBlock)locatedBlocks.get(0);
                    try (BlockReader blockReader = null;){
                        blockReader = BlockReaderTestUtil.getBlockReader(cluster.getFileSystem(), lblock, 0, 4000L);
                        org.junit.jupiter.api.Assertions.fail((String)"expected getBlockReader to fail the first time.");
                    }
                    gotFailureLatch.countDown();
                    shouldRetryLatch.await();
                    try {
                        blockReader = BlockReaderTestUtil.getBlockReader(cluster.getFileSystem(), lblock, 0, 4000L);
                    }
                    catch (Throwable t) {
                        LOG.error("error trying to retrieve a block reader the second time.", t);
                        throw t;
                    }
                    finally {
                        if (blockReader != null) {
                            blockReader.close();
                        }
                    }
                }
                catch (Throwable t) {
                    LOG.error("getBlockReader failure", t);
                    testFailed.set(true);
                }
            }
        };
        Thread[] threads = new Thread[2];
        for (i = 0; i < 2; ++i) {
            threads[i] = new Thread(readerRunnable);
            threads[i].start();
        }
        gotFailureLatch.await();
        replicaCreationShouldFail.set(false);
        shouldRetryLatch.countDown();
        for (i = 0; i < 2; ++i) {
            Uninterruptibles.joinUninterruptibly((Thread)threads[i]);
        }
        cluster.shutdown();
        sockDir.close();
        org.junit.jupiter.api.Assertions.assertFalse((boolean)testFailed.get());
    }

    @Test
    public void testShortCircuitCacheUnbufferDefault() throws Exception {
        this.testShortCircuitCacheUnbufferWithDisableInterval(600L, true);
    }

    @Test
    public void testShortCircuitCacheUnbufferDisabled() throws Exception {
        this.testShortCircuitCacheUnbufferWithDisableInterval(0L, false);
    }

    private void testShortCircuitCacheUnbufferWithDisableInterval(long interval, boolean disabled) throws Exception {
        String testName = GenericTestUtils.getMethodName();
        BlockReaderTestUtil.enableBlockReaderFactoryTracing();
        try (TemporarySocketDirectory sockDir = new TemporarySocketDirectory();){
            Configuration conf = TestBlockReaderFactory.createShortCircuitConf(testName, sockDir);
            conf.set("dfs.client.context", testName + interval + disabled);
            conf.setLong("dfs.domain.socket.disable.interval.seconds", interval);
            Configuration serverConf = new Configuration(conf);
            MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(serverConf).numDataNodes(1);
            try (MiniDFSCluster cluster = builder.build();
                 DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get((URI)cluster.getURI(0), (Configuration)conf);){
                byte[] buf;
                cluster.waitActive();
                Path testFile = new Path("/test_file");
                int testFileLen = 4000;
                int seed = 1027565;
                DFSTestUtil.createFile((FileSystem)dfs, testFile, 4000L, (short)1, 1027565L);
                byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027565L, 4000);
                try (FSDataInputStream in = dfs.open(testFile);){
                    org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)dfs.getClient().getClientContext().getShortCircuitCache(0L).getReplicaInfoMapSize());
                    buf = new byte[4000];
                    IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)4000);
                    this.validateReadResult(dfs, expected, buf, 1);
                    dfs.getClient().getClientContext().getShortCircuitCache(0L).setMaxTotalSize(0);
                    LOG.info("Unbuffering");
                    in.unbuffer();
                    org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)dfs.getClient().getClientContext().getShortCircuitCache(0L).getReplicaInfoMapSize());
                    DFSTestUtil.appendFile((FileSystem)dfs, testFile, "append more data");
                    Arrays.fill(buf, (byte)0);
                    in.seek(0L);
                    IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)4000);
                    this.validateReadResult(dfs, expected, buf, 0);
                }
                LOG.info("Reading {} again.", (Object)testFile);
                in = dfs.open(testFile);
                var19_24 = null;
                try {
                    buf = new byte[4000];
                    Arrays.fill(buf, (byte)0);
                    IOUtils.readFully((InputStream)in, (byte[])buf, (int)0, (int)4000);
                    int expectedMapSize = disabled ? 0 : 1;
                    this.validateReadResult(dfs, expected, buf, expectedMapSize);
                }
                catch (Throwable throwable) {
                    var19_24 = throwable;
                    throw throwable;
                }
                finally {
                    if (in != null) {
                        if (var19_24 != null) {
                            try {
                                in.close();
                            }
                            catch (Throwable throwable) {
                                var19_24.addSuppressed(throwable);
                            }
                        } else {
                            in.close();
                        }
                    }
                }
            }
        }
    }

    private void validateReadResult(DistributedFileSystem dfs, byte[] expected, byte[] actual, int expectedScrRepMapSize) {
        Assertions.assertThat((byte[])expected).isEqualTo((Object)actual);
        org.junit.jupiter.api.Assertions.assertEquals((int)expectedScrRepMapSize, (int)dfs.getClient().getClientContext().getShortCircuitCache(0L).getReplicaInfoMapSize());
    }

    @Test
    public void testShortCircuitReadFromServerWithoutShm() throws Exception {
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration clientConf = TestBlockReaderFactory.createShortCircuitConf("testShortCircuitReadFromServerWithoutShm", sockDir);
        Configuration serverConf = new Configuration(clientConf);
        serverConf.setInt("dfs.short.circuit.shared.memory.watcher.interrupt.check.ms", 0);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
        cluster.waitActive();
        clientConf.set("dfs.client.context", "testShortCircuitReadFromServerWithoutShm_clientContext");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)cluster.getURI(0), (Configuration)clientConf);
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 4000;
        int SEED = 1027564;
        DFSTestUtil.createFile((FileSystem)fs, new Path("/test_file"), 4000L, (short)1, 1027564L);
        byte[] contents = DFSTestUtil.readFileBuffer((FileSystem)fs, new Path("/test_file"));
        byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027564L, 4000);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)Arrays.equals(contents, expected));
        ShortCircuitCache cache = fs.getClient().getClientContext().getShortCircuitCache(0L);
        final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder().setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
        cache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor(){

            public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> info) throws IOException {
                org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)info.size());
                DfsClientShmManager.PerDatanodeVisitorInfo vinfo = info.get(datanode);
                org.junit.jupiter.api.Assertions.assertTrue((boolean)vinfo.disabled);
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)vinfo.full.size());
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)vinfo.notFull.size());
            }
        });
        cluster.shutdown();
        sockDir.close();
    }

    @Test
    public void testShortCircuitReadFromClientWithoutShm() throws Exception {
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration clientConf = TestBlockReaderFactory.createShortCircuitConf("testShortCircuitReadWithoutShm", sockDir);
        Configuration serverConf = new Configuration(clientConf);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
        cluster.waitActive();
        clientConf.setInt("dfs.short.circuit.shared.memory.watcher.interrupt.check.ms", 0);
        clientConf.set("dfs.client.context", "testShortCircuitReadFromClientWithoutShm_clientContext");
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)cluster.getURI(0), (Configuration)clientConf);
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 4000;
        int SEED = 1027564;
        DFSTestUtil.createFile((FileSystem)fs, new Path("/test_file"), 4000L, (short)1, 1027564L);
        byte[] contents = DFSTestUtil.readFileBuffer((FileSystem)fs, new Path("/test_file"));
        byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027564L, 4000);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)Arrays.equals(contents, expected));
        ShortCircuitCache cache = fs.getClient().getClientContext().getShortCircuitCache(0L);
        org.junit.jupiter.api.Assertions.assertEquals(null, (Object)cache.getDfsClientShmManager());
        cluster.shutdown();
        sockDir.close();
    }

    @Test
    public void testShortCircuitCacheShutdown() throws Exception {
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestBlockReaderFactory.createShortCircuitConf("testShortCircuitCacheShutdown", sockDir);
        conf.set("dfs.client.context", "testShortCircuitCacheShutdown");
        Configuration serverConf = new Configuration(conf);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)cluster.getURI(0), (Configuration)conf);
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 4000;
        int SEED = 1027564;
        DFSTestUtil.createFile((FileSystem)fs, new Path("/test_file"), 4000L, (short)1, 1027564L);
        byte[] contents = DFSTestUtil.readFileBuffer((FileSystem)fs, new Path("/test_file"));
        byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027564L, 4000);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)Arrays.equals(contents, expected));
        ShortCircuitCache cache = fs.getClient().getClientContext().getShortCircuitCache(0L);
        cache.close();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)cache.getDfsClientShmManager().getDomainSocketWatcher().isClosed());
        cluster.shutdown();
        sockDir.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=120L)
    public void testPurgingClosedReplicas() throws Exception {
        BlockReaderTestUtil.enableBlockReaderFactoryTracing();
        final AtomicInteger replicasCreated = new AtomicInteger(0);
        final AtomicBoolean testFailed = new AtomicBoolean(false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator(){

            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                replicasCreated.incrementAndGet();
                return null;
            }
        };
        TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
        Configuration conf = TestBlockReaderFactory.createShortCircuitConf("testPurgingClosedReplicas", sockDir);
        final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        DistributedFileSystem dfs = cluster.getFileSystem();
        String TEST_FILE = "/test_file";
        int TEST_FILE_LEN = 4095;
        int SEED = 1027552;
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)cluster.getURI(0), (Configuration)conf);
        DFSTestUtil.createFile((FileSystem)fs, new Path("/test_file"), 4095L, (short)1, 1027552L);
        final Semaphore sem = new Semaphore(0);
        List locatedBlocks = cluster.getNameNode().getRpcServer().getBlockLocations("/test_file", 0L, 4095L).getLocatedBlocks();
        final LocatedBlock lblock = (LocatedBlock)locatedBlocks.get(0);
        final byte[] buf = new byte[4095];
        Runnable readerRunnable = new Runnable(){

            @Override
            public void run() {
                try {
                    while (true) {
                        try (BlockReader blockReader = null;){
                            blockReader = BlockReaderTestUtil.getBlockReader(cluster.getFileSystem(), lblock, 0, 4095L);
                            sem.release();
                            try {
                                blockReader.readAll(buf, 0, 4095);
                            }
                            finally {
                                sem.acquireUninterruptibly();
                            }
                        }
                        LOG.info("read another 4095 bytes.");
                    }
                }
                catch (Throwable t) {
                    LOG.error("getBlockReader failure", t);
                    testFailed.set(true);
                    sem.release();
                }
            }
        };
        Thread thread = new Thread(readerRunnable);
        thread.start();
        while (thread.isAlive()) {
            sem.acquireUninterruptibly();
            thread.interrupt();
            sem.release();
        }
        org.junit.jupiter.api.Assertions.assertFalse((boolean)testFailed.get());
        try (BlockReader blockReader = null;){
            blockReader = BlockReaderTestUtil.getBlockReader(cluster.getFileSystem(), lblock, 0, 4095L);
            blockReader.readFully(buf, 0, 4095);
        }
        byte[] expected = DFSTestUtil.calculateFileContentsFromSeed(1027552L, 4095);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)Arrays.equals(buf, expected));
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)replicasCreated.get());
        dfs.close();
        cluster.shutdown();
        sockDir.close();
    }
}

