/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;

@InterfaceAudience.Private
public class BlockRecoveryWorker {
    public static final Logger LOG = DataNode.LOG;
    private final DataNode datanode;
    private final Configuration conf;
    private final DNConf dnConf;

    BlockRecoveryWorker(DataNode datanode) {
        this.datanode = datanode;
        this.conf = datanode.getConf();
        this.dnConf = datanode.getDnConf();
    }

    private DatanodeID getDatanodeID(String bpid) throws IOException {
        BPOfferService bpos = this.datanode.getBPOfferService(bpid);
        if (bpos == null) {
            throw new IOException("No block pool offer service for bpid=" + bpid);
        }
        return new DatanodeID((DatanodeID)bpos.bpRegistration);
    }

    private static void logRecoverBlock(String who, BlockRecoveryCommand.RecoveringBlock rb) {
        ExtendedBlock block = rb.getBlock();
        Object[] targets = rb.getLocations();
        LOG.info("BlockRecoveryWorker: {} calls recoverBlock({}, targets=[{}], newGenerationStamp={}, newBlock={}, isStriped={})", new Object[]{who, block, Joiner.on((String)", ").join(targets), rb.getNewGenerationStamp(), rb.getNewBlock(), rb.isStriped()});
    }

    private static ReplicaRecoveryInfo callInitReplicaRecovery(InterDatanodeProtocol datanode, BlockRecoveryCommand.RecoveringBlock rBlock) throws IOException {
        try {
            return datanode.initReplicaRecovery(rBlock);
        }
        catch (RemoteException re) {
            throw re.unwrapRemoteException();
        }
    }

    DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid) throws IOException {
        BPOfferService bpos = this.datanode.getBPOfferService(bpid);
        if (bpos == null) {
            throw new IOException("No block pool offer service for bpid=" + bpid);
        }
        DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
        if (activeNN == null) {
            throw new IOException("Block pool " + bpid + " has not recognized an active NN");
        }
        return activeNN;
    }

    public Daemon recoverBlocks(final String who, final Collection<BlockRecoveryCommand.RecoveringBlock> blocks) {
        Daemon d = new Daemon(this.datanode.threadGroup, new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                BlockRecoveryWorker.this.datanode.metrics.incrDataNodeBlockRecoveryWorkerCount();
                try {
                    for (BlockRecoveryCommand.RecoveringBlock b : blocks) {
                        try {
                            BlockRecoveryWorker.logRecoverBlock(who, b);
                            if (b.isStriped()) {
                                new RecoveryTaskStriped((BlockRecoveryCommand.RecoveringStripedBlock)b).recover();
                                continue;
                            }
                            new RecoveryTaskContiguous(b).recover();
                        }
                        catch (IOException e) {
                            LOG.warn("recover Block: {} FAILED: ", (Object)b, (Object)e);
                        }
                    }
                }
                finally {
                    BlockRecoveryWorker.this.datanode.metrics.decrDataNodeBlockRecoveryWorkerCount();
                }
            }
        });
        d.start();
        return d;
    }

    public class RecoveryTaskStriped {
        private final BlockRecoveryCommand.RecoveringBlock rBlock;
        private final ExtendedBlock block;
        private final String bpid;
        private final DatanodeInfo[] locs;
        private final long recoveryId;
        private final byte[] blockIndices;
        private final ErasureCodingPolicy ecPolicy;

        RecoveryTaskStriped(BlockRecoveryCommand.RecoveringStripedBlock rBlock) {
            this.rBlock = rBlock;
            Preconditions.checkArgument((rBlock.getNewBlock() == null ? 1 : 0) != 0);
            this.block = rBlock.getBlock();
            this.bpid = this.block.getBlockPoolId();
            this.locs = rBlock.getLocations();
            this.recoveryId = rBlock.getNewGenerationStamp();
            this.blockIndices = rBlock.getBlockIndices();
            this.ecPolicy = rBlock.getErasureCodingPolicy();
        }

        protected void recover() throws IOException {
            long safeLength;
            this.checkLocations(this.locs.length);
            HashMap<Long, BlockRecord> syncBlocks = new HashMap<Long, BlockRecord>(this.locs.length);
            int dataBlkNum = this.ecPolicy.getNumDataUnits();
            int totalBlkNum = dataBlkNum + this.ecPolicy.getNumParityUnits();
            int zeroLenReplicaCnt = 0;
            int dnNotHaveReplicaCnt = 0;
            for (int i = 0; i < this.locs.length; ++i) {
                DatanodeInfo id = this.locs[i];
                ExtendedBlock internalBlk = null;
                try {
                    DatanodeID bpReg = BlockRecoveryWorker.this.getDatanodeID(this.bpid);
                    internalBlk = new ExtendedBlock(this.block);
                    long blockId = this.block.getBlockId() + (long)this.blockIndices[i];
                    internalBlk.setBlockId(blockId);
                    DataNode proxyDN = bpReg.equals((Object)id) ? BlockRecoveryWorker.this.datanode : DataNode.createInterDataNodeProtocolProxy((DatanodeID)id, BlockRecoveryWorker.this.conf, BlockRecoveryWorker.this.dnConf.socketTimeout, BlockRecoveryWorker.this.dnConf.connectToDnViaHostname);
                    ReplicaRecoveryInfo info = BlockRecoveryWorker.callInitReplicaRecovery(proxyDN, new BlockRecoveryCommand.RecoveringBlock(internalBlk, null, this.recoveryId));
                    if (info != null && info.getGenerationStamp() >= this.block.getGenerationStamp() && info.getNumBytes() > 0L) {
                        BlockRecord existing = (BlockRecord)syncBlocks.get(blockId);
                        if (existing == null || info.getNumBytes() > existing.rInfo.getNumBytes()) {
                            syncBlocks.put(blockId, new BlockRecord((DatanodeID)id, proxyDN, info));
                            continue;
                        }
                        LOG.debug("Block recovery: Ignored replica with invalid original state: {} from DataNode: {} by block: {}", new Object[]{info, id, this.block});
                        continue;
                    }
                    if (info == null) {
                        LOG.debug("Block recovery: DataNode: {} does not have replica for block: (block={}, internalBlk={})", new Object[]{id, this.block, internalBlk});
                        ++dnNotHaveReplicaCnt;
                        continue;
                    }
                    LOG.debug("Block recovery: Ignored replica with invalid generation stamp or length: {} from DataNode: {} by block: {}", new Object[]{info, id, this.block});
                    if (info.getNumBytes() != 0L) continue;
                    ++zeroLenReplicaCnt;
                    continue;
                }
                catch (RecoveryInProgressException ripE) {
                    InterDatanodeProtocol.LOG.warn("Recovery for replica (block={}, internalBlk={}) on data-node {} is already in progress. Recovery id = {} is aborted.", new Object[]{this.block, internalBlk, id, this.rBlock.getNewGenerationStamp(), ripE});
                    return;
                }
                catch (IOException e) {
                    InterDatanodeProtocol.LOG.warn("Failed to recover block (block={}, internalBlk={}, datanode={})", new Object[]{this.block, internalBlk, id, e});
                }
            }
            if (dnNotHaveReplicaCnt + zeroLenReplicaCnt <= this.locs.length - this.ecPolicy.getNumDataUnits()) {
                this.checkLocations(syncBlocks.size());
                safeLength = this.getSafeLength(syncBlocks);
            } else {
                safeLength = 0L;
                LOG.warn("Block recovery: {} datanodes do not have the replica of block {}. {} datanodes have zero-length replica. Will remove this block.", new Object[]{dnNotHaveReplicaCnt, this.block, zeroLenReplicaCnt});
            }
            LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", new Object[]{this.block, this.block.getNumBytes(), safeLength, syncBlocks});
            ArrayList<BlockRecord> rurList = new ArrayList<BlockRecord>(this.locs.length);
            for (BlockRecord r : syncBlocks.values()) {
                int blockIndex = (int)(r.rInfo.getBlockId() & 0xFL);
                long newSize = StripedBlockUtil.getInternalBlockLength((long)safeLength, (int)this.ecPolicy.getCellSize(), (int)dataBlkNum, (int)blockIndex);
                if (r.rInfo.getNumBytes() < newSize) continue;
                rurList.add(r);
            }
            if (safeLength > 0L) {
                Preconditions.checkArgument((rurList.size() >= dataBlkNum ? 1 : 0) != 0, (Object)"incorrect safe length");
                this.truncatePartialBlock(rurList, safeLength);
            }
            DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
            String[] newStorages = new String[totalBlkNum];
            for (int i = 0; i < newLocs.length; ++i) {
                newLocs[i] = DatanodeID.EMPTY_DATANODE_ID;
                newStorages[i] = "";
            }
            for (BlockRecord r : rurList) {
                int index = (int)(r.rInfo.getBlockId() & 0xFL);
                newLocs[index] = r.id;
                if (r.storageID == null) continue;
                newStorages[index] = r.storageID;
            }
            ExtendedBlock newBlock = new ExtendedBlock(this.bpid, this.block.getBlockId(), safeLength, this.recoveryId);
            DatanodeProtocolClientSideTranslatorPB nn = BlockRecoveryWorker.this.getActiveNamenodeForBP(this.bpid);
            if (safeLength == 0L) {
                nn.commitBlockSynchronization(this.block, newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, true, newLocs, newStorages);
                LOG.info("After block recovery, the length of new block is 0. Will remove this block: {} from file.", (Object)newBlock);
                return;
            }
            nn.commitBlockSynchronization(this.block, newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, newLocs, newStorages);
        }

        private void truncatePartialBlock(List<BlockRecord> rurList, long safeLength) throws IOException {
            int cellSize = this.ecPolicy.getCellSize();
            int dataBlkNum = this.ecPolicy.getNumDataUnits();
            ArrayList<DatanodeID> failedList = new ArrayList<DatanodeID>();
            for (BlockRecord r : rurList) {
                int blockIndex = (int)(r.rInfo.getBlockId() & 0xFL);
                long newSize = StripedBlockUtil.getInternalBlockLength((long)safeLength, (int)cellSize, (int)dataBlkNum, (int)blockIndex);
                try {
                    r.updateReplicaUnderRecovery(this.bpid, this.recoveryId, r.rInfo.getBlockId(), newSize);
                }
                catch (IOException e) {
                    InterDatanodeProtocol.LOG.warn("Failed to updateBlock (block={}, internalBlk={}, datanode={})", new Object[]{this.block, r.rInfo, r.id, e});
                    failedList.add(r.id);
                }
            }
            if (!failedList.isEmpty()) {
                throw new IOException("Cannot recover " + this.block + ", the following datanodes failed: " + failedList);
            }
        }

        @VisibleForTesting
        long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
            int dataBlkNum = this.ecPolicy.getNumDataUnits();
            Preconditions.checkArgument((syncBlocks.size() >= dataBlkNum ? 1 : 0) != 0);
            long[] blockLengths = new long[syncBlocks.size()];
            int i = 0;
            for (BlockRecord r : syncBlocks.values()) {
                ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo();
                blockLengths[i++] = rInfo.getNumBytes();
            }
            return StripedBlockUtil.getSafeLength((ErasureCodingPolicy)this.ecPolicy, (long[])blockLengths);
        }

        private void checkLocations(int locationCount) throws IOException {
            if (locationCount < this.ecPolicy.getNumDataUnits()) {
                throw new IOException(this.block + " has no enough internal blocks(current: " + locationCount + "), unable to start recovery. Locations=" + Arrays.asList(this.locs));
            }
        }
    }

    class RecoveryTaskContiguous {
        private final BlockRecoveryCommand.RecoveringBlock rBlock;
        private final ExtendedBlock block;
        private final String bpid;
        private final DatanodeInfo[] locs;
        private final long recoveryId;

        RecoveryTaskContiguous(BlockRecoveryCommand.RecoveringBlock rBlock) {
            this.rBlock = rBlock;
            this.block = rBlock.getBlock();
            this.bpid = this.block.getBlockPoolId();
            this.locs = rBlock.getLocations();
            this.recoveryId = rBlock.getNewGenerationStamp();
        }

        protected void recover() throws IOException {
            ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(this.locs.length);
            int errorCount = 0;
            int candidateReplicaCnt = 0;
            DataNodeFaultInjector.get().delay();
            for (DatanodeInfo id : this.locs) {
                try {
                    DatanodeID bpReg = BlockRecoveryWorker.this.getDatanodeID(this.bpid);
                    DataNode proxyDN = bpReg.equals((Object)id) ? BlockRecoveryWorker.this.datanode : DataNode.createInterDataNodeProtocolProxy((DatanodeID)id, BlockRecoveryWorker.this.conf, BlockRecoveryWorker.this.dnConf.socketTimeout, BlockRecoveryWorker.this.dnConf.connectToDnViaHostname);
                    ReplicaRecoveryInfo info = BlockRecoveryWorker.callInitReplicaRecovery(proxyDN, this.rBlock);
                    if (info != null && info.getGenerationStamp() >= this.block.getGenerationStamp() && info.getNumBytes() > 0L) {
                        ++candidateReplicaCnt;
                        if (info.getOriginalReplicaState().getValue() <= HdfsServerConstants.ReplicaState.RWR.getValue()) {
                            syncList.add(new BlockRecord((DatanodeID)id, proxyDN, info));
                            continue;
                        }
                        LOG.debug("Block recovery: Ignored replica with invalid original state: {} from DataNode: {}", (Object)info, (Object)id);
                        continue;
                    }
                    if (info == null) {
                        LOG.debug("Block recovery: DataNode: {} does not have replica for block: {}", (Object)id, (Object)this.block);
                        continue;
                    }
                    LOG.debug("Block recovery: Ignored replica with invalid generation stamp or length: {} from DataNode: {}", (Object)info, (Object)id);
                }
                catch (RecoveryInProgressException ripE) {
                    InterDatanodeProtocol.LOG.warn("Recovery for replica {} on data-node {} is already in progress. Recovery id = {} is aborted.", new Object[]{this.block, id, this.rBlock.getNewGenerationStamp(), ripE});
                    return;
                }
                catch (IOException e) {
                    ++errorCount;
                    InterDatanodeProtocol.LOG.warn("Failed to recover block (block={}, datanode={})", new Object[]{this.block, id, e});
                }
            }
            if (errorCount == this.locs.length) {
                throw new IOException("All datanodes failed: block=" + this.block + ", datanodeids=" + Arrays.asList(this.locs));
            }
            if (candidateReplicaCnt > 0 && syncList.isEmpty()) {
                throw new IOException("Found " + candidateReplicaCnt + " replica(s) for block " + this.block + " but none is in " + HdfsServerConstants.ReplicaState.RWR.name() + " or better state. datanodeids=" + Arrays.asList(this.locs));
            }
            this.syncBlock(syncList);
        }

        void syncBlock(List<BlockRecord> syncList) throws IOException {
            DatanodeProtocolClientSideTranslatorPB nn = BlockRecoveryWorker.this.getActiveNamenodeForBP(this.block.getBlockPoolId());
            boolean isTruncateRecovery = this.rBlock.getNewBlock() != null;
            long blockId = isTruncateRecovery ? this.rBlock.getNewBlock().getBlockId() : this.block.getBlockId();
            LOG.info("BlockRecoveryWorker: block={} (length={}), isTruncateRecovery={}, syncList={}", new Object[]{this.block, this.block.getNumBytes(), isTruncateRecovery, syncList});
            if (syncList.isEmpty()) {
                LOG.debug("syncBlock for block {}, all datanodes don't have the block or their replicas have 0 length. The block can be deleted.", (Object)this.block);
                nn.commitBlockSynchronization(this.block, this.recoveryId, 0L, true, true, DatanodeID.EMPTY_ARRAY, null);
                return;
            }
            HdfsServerConstants.ReplicaState bestState = HdfsServerConstants.ReplicaState.RWR;
            long finalizedLength = -1L;
            for (BlockRecord r : syncList) {
                assert (r.rInfo.getNumBytes() > 0L) : "zero length replica";
                Object rState = r.rInfo.getOriginalReplicaState();
                if (rState.getValue() < bestState.getValue()) {
                    bestState = rState;
                }
                if (rState != HdfsServerConstants.ReplicaState.FINALIZED) continue;
                if (finalizedLength > 0L && finalizedLength != r.rInfo.getNumBytes()) {
                    throw new IOException("Inconsistent size of finalized replicas. Replica " + r.rInfo + " expected size: " + finalizedLength);
                }
                finalizedLength = r.rInfo.getNumBytes();
            }
            ArrayList<BlockRecord> participatingList = new ArrayList<BlockRecord>();
            ExtendedBlock newBlock = new ExtendedBlock(this.bpid, blockId, -1L, this.recoveryId);
            switch (bestState) {
                case FINALIZED: {
                    assert (finalizedLength > 0L) : "finalizedLength is not positive";
                    for (BlockRecord r : syncList) {
                        Object rState = r.rInfo.getOriginalReplicaState();
                        if (rState == HdfsServerConstants.ReplicaState.FINALIZED || rState == HdfsServerConstants.ReplicaState.RBW && r.rInfo.getNumBytes() == finalizedLength) {
                            participatingList.add(r);
                        }
                        LOG.debug("syncBlock replicaInfo: block={}, from datanode {}, receivedState={}, receivedLength={}, bestState=FINALIZED, finalizedLength={}", new Object[]{this.block, r.id, ((Enum)rState).name(), r.rInfo.getNumBytes(), finalizedLength});
                    }
                    newBlock.setNumBytes(finalizedLength);
                    break;
                }
                case RBW: 
                case RWR: {
                    long minLength = Long.MAX_VALUE;
                    for (BlockRecord r : syncList) {
                        HdfsServerConstants.ReplicaState rState = r.rInfo.getOriginalReplicaState();
                        if (rState == bestState) {
                            minLength = Math.min(minLength, r.rInfo.getNumBytes());
                            participatingList.add(r);
                        }
                        LOG.debug("syncBlock replicaInfo: block={}, from datanode {}, receivedState={}, receivedLength={}, bestState={}", new Object[]{this.block, r.id, rState.name(), r.rInfo.getNumBytes(), bestState.name()});
                    }
                    if (minLength == Long.MAX_VALUE) {
                        throw new IOException("Incorrect block size");
                    }
                    newBlock.setNumBytes(minLength);
                    break;
                }
                case RUR: 
                case TEMPORARY: {
                    assert (false) : "bad replica state: " + bestState;
                    break;
                }
            }
            if (isTruncateRecovery) {
                newBlock.setNumBytes(this.rBlock.getNewBlock().getNumBytes());
            }
            LOG.info("BlockRecoveryWorker: block={} (length={}), bestState={}, newBlock={} (length={}), participatingList={}", new Object[]{this.block, this.block.getNumBytes(), bestState.name(), newBlock, newBlock.getNumBytes(), participatingList});
            ArrayList<DatanodeID> failedList = new ArrayList<DatanodeID>();
            ArrayList<BlockRecord> successList = new ArrayList<BlockRecord>();
            for (BlockRecord r : participatingList) {
                try {
                    r.updateReplicaUnderRecovery(this.bpid, this.recoveryId, blockId, newBlock.getNumBytes());
                    successList.add(r);
                }
                catch (IOException e) {
                    InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + newBlock + ", datanode=" + r.id + ")", (Throwable)e);
                    failedList.add(r.id);
                }
            }
            if (successList.isEmpty()) {
                throw new IOException("Cannot recover " + this.block + ", the following datanodes failed: " + failedList);
            }
            DatanodeID[] datanodes = new DatanodeID[successList.size()];
            String[] storages = new String[datanodes.length];
            for (int i = 0; i < datanodes.length; ++i) {
                BlockRecord r = (BlockRecord)successList.get(i);
                datanodes[i] = r.id;
                storages[i] = r.storageID;
            }
            LOG.debug("Datanode triggering commitBlockSynchronization, block={}, newGs={}, newLength={}", new Object[]{this.block, newBlock.getGenerationStamp(), newBlock.getNumBytes()});
            nn.commitBlockSynchronization(this.block, newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, datanodes, storages);
        }
    }

    static class BlockRecord {
        private final DatanodeID id;
        private final InterDatanodeProtocol datanode;
        private final ReplicaRecoveryInfo rInfo;
        private String storageID;

        BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, ReplicaRecoveryInfo rInfo) {
            this.id = id;
            this.datanode = datanode;
            this.rInfo = rInfo;
        }

        private void updateReplicaUnderRecovery(String bpid, long recoveryId, long newBlockId, long newLength) throws IOException {
            ExtendedBlock b = new ExtendedBlock(bpid, (Block)this.rInfo);
            this.storageID = this.datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId, newLength);
        }

        public ReplicaRecoveryInfo getReplicaRecoveryInfo() {
            return this.rInfo;
        }

        public String toString() {
            return "block:" + this.rInfo + " node:" + this.id;
        }
    }
}

