/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.util.Collection;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class TestTransferRbw {
    private static final Logger LOG = LoggerFactory.getLogger(TestTransferRbw.class);
    private static final Random RAN = new Random();
    private static final short REPLICATION = 1;

    public TestTransferRbw() {
        GenericTestUtils.setLogLevel((Logger)DataNode.LOG, (Level)Level.TRACE);
    }

    private static ReplicaBeingWritten getRbw(DataNode datanode, String bpid) throws InterruptedException {
        return (ReplicaBeingWritten)TestTransferRbw.getReplica(datanode, bpid, HdfsServerConstants.ReplicaState.RBW);
    }

    private static LocalReplicaInPipeline getReplica(DataNode datanode, String bpid, HdfsServerConstants.ReplicaState expectedState) throws InterruptedException {
        Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(datanode.getFSDataset(), bpid);
        for (int i = 0; i < 5 && replicas.size() == 0; ++i) {
            LOG.info("wait since replicas.size() == 0; i=" + i);
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)replicas.size());
        ReplicaInfo r = replicas.iterator().next();
        Assertions.assertEquals((Object)expectedState, (Object)r.getState());
        return (LocalReplicaInPipeline)r;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransferRbw() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).build();
        try {
            int i;
            int len;
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem();
            Path p = new Path("/foo");
            int size = 65536 + RAN.nextInt(65536);
            LOG.info("size = " + size);
            FSDataOutputStream out = fs.create(p, (short)1);
            byte[] bytes = new byte[1024];
            for (int remaining = size; remaining > 0; remaining -= len) {
                RAN.nextBytes(bytes);
                len = bytes.length < remaining ? bytes.length : remaining;
                out.write(bytes, 0, len);
                out.hflush();
            }
            String bpid = cluster.getNamesystem().getBlockPoolId();
            DataNode oldnode = cluster.getDataNodes().get(0);
            Assertions.assertNull((Object)oldnode.xserver.getWriteThrottler());
            ReplicaBeingWritten oldrbw = TestTransferRbw.getRbw(oldnode, bpid);
            LOG.info("oldrbw = " + oldrbw);
            conf.setLong("dfs.datanode.data.write.bandwidthPerSec", 0x800000L);
            cluster.startDataNodes((Configuration)conf, 1, true, null, null);
            DataNode newnode = cluster.getDataNodes().get(1);
            Assertions.assertEquals((long)0x800000L, (long)newnode.xserver.getWriteThrottler().getBandwidth());
            DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc().getDatanodeReport(HdfsConstants.DatanodeReportType.LIVE);
            Assertions.assertEquals((int)2, (int)datatnodeinfos.length);
            DatanodeRegistration dnReg = newnode.getDNRegistrationForBP(bpid);
            for (i = 0; i < datatnodeinfos.length && !datatnodeinfos[i].equals((Object)dnReg); ++i) {
            }
            Assertions.assertTrue((i < datatnodeinfos.length ? 1 : 0) != 0);
            DatanodeInfo newnodeinfo = datatnodeinfos[i];
            DatanodeInfo oldnodeinfo = datatnodeinfos[1 - i];
            ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(), oldrbw.getBytesAcked(), oldrbw.getGenerationStamp());
            DataTransferProtos.BlockOpResponseProto s = DFSTestUtil.transferRbw(b, DFSClientAdapter.getDFSClient(fs), oldnodeinfo, newnodeinfo);
            Assertions.assertEquals((Object)DataTransferProtos.Status.SUCCESS, (Object)s.getStatus());
            ReplicaBeingWritten newrbw = TestTransferRbw.getRbw(newnode, bpid);
            LOG.info("newrbw = " + newrbw);
            Assertions.assertEquals((long)oldrbw.getBlockId(), (long)newrbw.getBlockId());
            Assertions.assertEquals((long)oldrbw.getGenerationStamp(), (long)newrbw.getGenerationStamp());
            Assertions.assertEquals((long)oldrbw.getVisibleLength(), (long)newrbw.getVisibleLength());
            LOG.info("DONE");
        }
        finally {
            cluster.shutdown();
        }
    }
}

