/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestIncrementalBlockReports {
    public static final Logger LOG = LoggerFactory.getLogger(TestIncrementalBlockReports.class);
    private static final short DN_COUNT = 1;
    private static final long DUMMY_BLOCK_ID = 5678L;
    private static final long DUMMY_BLOCK_LENGTH = 0x100000L;
    private static final long DUMMY_BLOCK_GENSTAMP = 1000L;
    private static final String TEST_FILE_DATA = "hello world";
    private static final String TEST_FILE = "/TestStandbyBlockManagement";
    private static final Path TEST_FILE_PATH = new Path("/TestStandbyBlockManagement");
    private MiniDFSCluster cluster = null;
    private Configuration conf;
    private NameNode singletonNn;
    private DataNode singletonDn;
    private BPOfferService bpos;
    private BPServiceActor actor;
    private String storageUuid;

    @BeforeEach
    public void startCluster() throws IOException {
        this.conf = new HdfsConfiguration();
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(1).build();
        this.singletonNn = this.cluster.getNameNode();
        this.singletonDn = this.cluster.getDataNodes().get(0);
        this.bpos = (BPOfferService)this.singletonDn.getAllBpOs().get(0);
        this.actor = (BPServiceActor)this.bpos.getBPServiceActors().get(0);
        try (FsDatasetSpi.FsVolumeReferences volumes = this.singletonDn.getFSDataset().getFsVolumeReferences();){
            this.storageUuid = volumes.get(0).getStorageID();
        }
    }

    private static Block getDummyBlock() {
        return new Block(5678L, 0x100000L, 1000L);
    }

    private void injectBlockReceived() {
        ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(TestIncrementalBlockReports.getDummyBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null);
        DatanodeStorage s = this.singletonDn.getFSDataset().getStorage(this.storageUuid);
        this.actor.getIbrManager().notifyNamenodeBlock(rdbi, s, false);
    }

    private void injectBlockDeleted() {
        ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(TestIncrementalBlockReports.getDummyBlock(), ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
        this.actor.getIbrManager().addRDBI(rdbi, this.singletonDn.getFSDataset().getStorage(this.storageUuid));
    }

    DatanodeProtocolClientSideTranslatorPB spyOnDnCallsToNn() {
        return InternalDataNodeTestUtils.spyOnBposToNN(this.singletonDn, this.singletonNn);
    }

    @Test
    @Timeout(value=60L)
    public void testReportBlockReceived() throws InterruptedException, IOException {
        try {
            DatanodeProtocolClientSideTranslatorPB nnSpy = this.spyOnDnCallsToNn();
            this.injectBlockReceived();
            Thread.sleep(2000L);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)nnSpy, (VerificationMode)Mockito.times((int)1))).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
        }
        finally {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test
    @Timeout(value=60L)
    public void testReportBlockDeleted() throws InterruptedException, IOException {
        try {
            DataNodeTestUtils.triggerBlockReport(this.singletonDn);
            DatanodeProtocolClientSideTranslatorPB nnSpy = this.spyOnDnCallsToNn();
            this.injectBlockDeleted();
            Thread.sleep(2000L);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)nnSpy, (VerificationMode)Mockito.times((int)0))).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
            DataNodeTestUtils.triggerHeartbeat(this.singletonDn);
            Thread.sleep(2000L);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)nnSpy, (VerificationMode)Mockito.times((int)1))).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
        }
        finally {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test
    @Timeout(value=60L)
    public void testReplaceReceivedBlock() throws InterruptedException, IOException {
        try {
            DatanodeProtocolClientSideTranslatorPB nnSpy = this.spyOnDnCallsToNn();
            this.injectBlockReceived();
            this.injectBlockReceived();
            Thread.sleep(2000L);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)nnSpy, (VerificationMode)Mockito.atLeastOnce())).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
            Assertions.assertFalse((boolean)this.actor.getIbrManager().sendImmediately());
        }
        finally {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIBRRaceCondition() throws Exception {
        this.cluster.shutdown();
        this.conf = new Configuration();
        HAUtil.setAllowStandbyReads((Configuration)this.conf, (boolean)true);
        this.conf.setInt("dfs.ha.tail-edits.period", 1);
        this.cluster = new MiniDFSCluster.Builder(this.conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
        try {
            this.cluster.waitActive();
            this.cluster.transitionToActive(0);
            NameNode nn1 = this.cluster.getNameNode(0);
            NameNode nn2 = this.cluster.getNameNode(1);
            BlockManager bm2 = nn2.getNamesystem().getBlockManager();
            DistributedFileSystem fs = HATestUtil.configureFailoverFs(this.cluster, this.conf);
            ArrayList ibrsToStandby = new ArrayList();
            ArrayList<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<DatanodeProtocolClientSideTranslatorPB>();
            Phaser ibrPhaser = new Phaser(1);
            for (DataNode dataNode : this.cluster.getDataNodes()) {
                DatanodeProtocolClientSideTranslatorPB nnSpy = InternalDataNodeTestUtils.spyOnBposToNN(dataNode, nn2);
                ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer(inv -> {
                    for (StorageReceivedDeletedBlocks srdb : (StorageReceivedDeletedBlocks[])inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
                        for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
                            if (!block.getStatus().equals((Object)ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK)) continue;
                            ibrPhaser.arriveAndDeregister();
                        }
                    }
                    return null;
                }).when((Object)nnSpy)).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
                spies.add(nnSpy);
            }
            LOG.info("==================================");
            ibrPhaser.bulkRegister(9);
            DFSTestUtil.writeFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            int phase = ibrPhaser.arrive();
            ibrPhaser.awaitAdvanceInterruptibly(phase, 60L, TimeUnit.SECONDS);
            for (InvocationOnMock sendIBRs : ibrsToStandby) {
                try {
                    sendIBRs.callRealMethod();
                }
                catch (Throwable t) {
                    LOG.error("Exception thrown while calling sendIBRs: ", t);
                }
            }
            GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, (long)1000L, (long)30000L, (String)"There should be 0 pending DN messages");
            ibrsToStandby.clear();
            ibrPhaser.bulkRegister(6);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            phase = ibrPhaser.arrive();
            ibrPhaser.awaitAdvanceInterruptibly(phase, 60L, TimeUnit.SECONDS);
            for (InvocationOnMock sendIBRs : ibrsToStandby) {
                try {
                    sendIBRs.callRealMethod();
                }
                catch (Throwable t) {
                    LOG.error("Exception thrown while calling sendIBRs: ", t);
                }
            }
            ibrsToStandby.clear();
            ibrPhaser.arriveAndDeregister();
            GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, (long)1000L, (long)30000L, (String)"There should be 0 pending DN messages");
            ExtendedBlock extendedBlock = DFSTestUtil.getFirstBlock((FileSystem)fs, TEST_FILE_PATH);
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            LOG.info("==================================");
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            this.cluster.waitActive(1);
            Assertions.assertEquals((int)0, (int)nn2.getNamesystem().getBlockManager().numCorruptReplicas(extendedBlock.getLocalBlock()), (String)"There should not be any corrupt replicas");
        }
        finally {
            this.cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIBRRaceCondition2() throws Exception {
        this.cluster.shutdown();
        Configuration conf = new Configuration();
        HAUtil.setAllowStandbyReads((Configuration)conf, (boolean)true);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        this.cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
        try {
            this.cluster.waitActive();
            this.cluster.transitionToActive(0);
            NameNode nn1 = this.cluster.getNameNode(0);
            NameNode nn2 = this.cluster.getNameNode(1);
            BlockManager bm2 = nn2.getNamesystem().getBlockManager();
            DistributedFileSystem fs = HATestUtil.configureFailoverFs(this.cluster, conf);
            ArrayList ibrsToStandby = new ArrayList();
            ArrayList<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<DatanodeProtocolClientSideTranslatorPB>();
            Phaser ibrPhaser = new Phaser(1);
            for (DataNode dataNode : this.cluster.getDataNodes()) {
                DatanodeProtocolClientSideTranslatorPB nnSpy = InternalDataNodeTestUtils.spyOnBposToNN(dataNode, nn2);
                ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer(inv -> {
                    for (StorageReceivedDeletedBlocks srdb : (StorageReceivedDeletedBlocks[])inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
                        for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
                            if (!block.getStatus().equals((Object)ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK)) continue;
                            ibrsToStandby.add(inv);
                            ibrPhaser.arriveAndDeregister();
                        }
                    }
                    return null;
                }).when((Object)nnSpy)).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
                spies.add(nnSpy);
            }
            LOG.info("==================================");
            ibrPhaser.bulkRegister(9);
            DFSTestUtil.writeFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            int phase = ibrPhaser.arrive();
            ibrPhaser.awaitAdvanceInterruptibly(phase, 60L, TimeUnit.SECONDS);
            for (InvocationOnMock sendIBRs : ibrsToStandby) {
                try {
                    sendIBRs.callRealMethod();
                }
                catch (Throwable t) {
                    LOG.error("Exception thrown while calling sendIBRs: ", t);
                }
            }
            GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, (long)1000L, (long)30000L, (String)"There should be 0 pending DN messages");
            ibrsToStandby.clear();
            ibrPhaser.arriveAndDeregister();
            ExtendedBlock extendedBlock = DFSTestUtil.getFirstBlock((FileSystem)fs, TEST_FILE_PATH);
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            LOG.info("==================================");
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            this.cluster.waitActive(1);
            Assertions.assertEquals((int)0, (int)nn2.getNamesystem().getBlockManager().numCorruptReplicas(extendedBlock.getLocalBlock()), (String)"There should not be any corrupt replicas");
        }
        finally {
            this.cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIBRRaceCondition3() throws Exception {
        this.cluster.shutdown();
        Configuration conf = new Configuration();
        HAUtil.setAllowStandbyReads((Configuration)conf, (boolean)true);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        this.cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
        try {
            this.cluster.waitActive();
            this.cluster.transitionToActive(0);
            NameNode nn1 = this.cluster.getNameNode(0);
            NameNode nn2 = this.cluster.getNameNode(1);
            BlockManager bm2 = nn2.getNamesystem().getBlockManager();
            DistributedFileSystem fs = HATestUtil.configureFailoverFs(this.cluster, conf);
            LinkedHashMap<Long, List> ibrsToStandby = new LinkedHashMap<Long, List>();
            AtomicLong lowestGenStamp = new AtomicLong(Long.MAX_VALUE);
            ArrayList<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<DatanodeProtocolClientSideTranslatorPB>();
            Phaser ibrPhaser = new Phaser(1);
            for (DataNode dn : this.cluster.getDataNodes()) {
                DatanodeProtocolClientSideTranslatorPB nnSpy = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
                ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer(inv -> {
                    for (StorageReceivedDeletedBlocks srdb : (StorageReceivedDeletedBlocks[])inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
                        for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
                            if (!block.getStatus().equals((Object)ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK)) continue;
                            long genStamp = block.getBlock().getGenerationStamp();
                            ibrsToStandby.putIfAbsent(genStamp, new ArrayList());
                            ((List)ibrsToStandby.get(genStamp)).add(inv);
                            lowestGenStamp.getAndUpdate(prev -> Math.min(prev, genStamp));
                            ibrPhaser.arriveAndDeregister();
                        }
                    }
                    return null;
                }).when((Object)nnSpy)).blockReceivedAndDeleted((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageReceivedDeletedBlocks[])ArgumentMatchers.any(StorageReceivedDeletedBlocks[].class));
                spies.add(nnSpy);
            }
            LOG.info("==================================");
            ibrPhaser.bulkRegister(9);
            DFSTestUtil.writeFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            DFSTestUtil.appendFile((FileSystem)fs, TEST_FILE_PATH, TEST_FILE_DATA);
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            int phase = ibrPhaser.arrive();
            ibrPhaser.awaitAdvanceInterruptibly(phase, 60L, TimeUnit.SECONDS);
            ibrsToStandby.forEach((genStamp, ibrs) -> {
                if (lowestGenStamp.get() != genStamp.longValue()) {
                    ibrs.removeIf(inv -> {
                        try {
                            inv.callRealMethod();
                        }
                        catch (Throwable t) {
                            LOG.error("Exception thrown while calling sendIBRs: ", t);
                        }
                        return true;
                    });
                }
            });
            GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, (long)1000L, (long)30000L, (String)"There should be 0 pending DN messages");
            ibrPhaser.arriveAndDeregister();
            ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, TEST_FILE_PATH);
            HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
            for (InvocationOnMock sendIBR : (List)ibrsToStandby.get(lowestGenStamp.get())) {
                try {
                    sendIBR.callRealMethod();
                }
                catch (Throwable t) {
                    LOG.error("Exception thrown while calling sendIBRs: ", t);
                }
            }
            GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 3, (long)1000L, (long)30000L, (String)"There should be 0 pending DN messages");
            LOG.info("==================================");
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            this.cluster.waitActive(1);
            Assertions.assertEquals((int)1, (int)nn2.getNamesystem().getBlockManager().numCorruptReplicas(block.getLocalBlock()), (String)"There should be 1 corrupt replica");
        }
        finally {
            this.cluster.shutdown();
        }
    }
}

