/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import io.hops.common.INodeUtil;
import io.hops.exception.StorageException;
import io.hops.exception.TransactionContextException;
import io.hops.exception.TransientStorageException;
import io.hops.leader_election.node.ActiveNode;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.HdfsVariables;
import io.hops.metadata.blockmanagement.ExcessReplicasMap;
import io.hops.metadata.common.FinderType;
import io.hops.metadata.common.entity.Variable;
import io.hops.metadata.hdfs.dal.MisReplicatedRangeQueueDataAccess;
import io.hops.metadata.hdfs.entity.EncodingStatus;
import io.hops.metadata.hdfs.entity.HashBucket;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.metadata.hdfs.entity.MisReplicatedRange;
import io.hops.metadata.security.token.block.NameNodeBlockTokenSecretManager;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import io.hops.util.Slicer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.BlocksMap;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.HashBuckets;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReplicationBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeSymlink;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.Bucket;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.annotations.VisibleForTesting;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.primitives.Longs;

@InterfaceAudience.Private
public class BlockManager {
    public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
    public static final Logger blockLog = NameNode.blockStateChangeLog;
    private final Namesystem namesystem;
    private final DatanodeManager datanodeManager;
    private final HeartbeatManager heartbeatManager;
    private final NameNodeBlockTokenSecretManager blockTokenSecretManager;
    private volatile long pendingReplicationBlocksCount = 0L;
    private volatile long corruptReplicaBlocksCount = 0L;
    private volatile long underReplicatedBlocksCount = 0L;
    private volatile long scheduledReplicationBlocksCount = 0L;
    private AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
    private final long startupDelayBlockDeletionInMs;
    private ExecutorService datanodeRemover = Executors.newSingleThreadExecutor();
    private final long replicationRecheckInterval;
    BlocksMap blocksMap;
    final Daemon replicationThread = new Daemon((Runnable)new ReplicationMonitor());
    final CorruptReplicasMap corruptReplicas;
    private final InvalidateBlocks invalidateBlocks;
    private final Set<Block> postponedMisreplicatedBlocks = Collections.newSetFromMap(new ConcurrentHashMap());
    public final ExcessReplicasMap excessReplicateMap;
    public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
    @VisibleForTesting
    final PendingReplicationBlocks pendingReplications;
    public final short maxReplication;
    int maxReplicationStreams;
    int replicationStreamsHardLimit;
    public final short minReplication;
    public final int defaultReplication;
    final int maxCorruptFilesReturned;
    final float blocksInvalidateWorkPct;
    final int blocksReplWorkMultiplier;
    final boolean shouldCheckForEnoughRacks;
    final boolean encryptDataTransfer;
    private final long maxNumBlocksToLog;
    private Daemon replicationQueuesInitializer = null;
    private double replicationQueuesInitProgress = 0.0;
    private BlockPlacementPolicy blockplacement;
    private final BlockStoragePolicySuite storagePolicySuite;
    private boolean checkNSRunning = true;
    private final int slicerBatchSize;
    private final int processMisReplicatedNoOfBatchs;
    private final int slicerNbThreads;
    private final int numBuckets;
    private final int blockFetcherNBThreads;
    private final int blockFetcherBucketsPerThread;

    public long getPendingReplicationBlocksCount() {
        return this.pendingReplicationBlocksCount;
    }

    public long getUnderReplicatedBlocksCount() {
        return this.underReplicatedBlocksCount;
    }

    public long getCorruptReplicaBlocksCount() {
        return this.corruptReplicaBlocksCount;
    }

    public long getScheduledReplicationBlocksCount() {
        return this.scheduledReplicationBlocksCount;
    }

    public long getPendingDeletionBlocksCount() throws IOException {
        return this.invalidateBlocks.numBlocks();
    }

    public long getStartupDelayBlockDeletionInMs() {
        return this.startupDelayBlockDeletionInMs;
    }

    public long getExcessBlocksCount() throws IOException {
        return this.excessReplicateMap.size();
    }

    public long getPostponedMisreplicatedBlocksCount() {
        return this.postponedMisreplicatedBlocksCount.get();
    }

    public BlockManager(Namesystem namesystem, Configuration conf) throws IOException {
        this.namesystem = namesystem;
        this.numBuckets = conf.getInt("dfs.blockreport.numbuckets", 1000);
        HashBuckets.initialize(this.numBuckets);
        this.blockFetcherNBThreads = conf.getInt("dfs.block.fetcher.nb.threads", 10);
        this.blockFetcherBucketsPerThread = conf.getInt("dfs.block.fetcher.buckets.per.thread", 1);
        this.datanodeManager = new DatanodeManager(this, namesystem, conf);
        this.corruptReplicas = new CorruptReplicasMap(this.datanodeManager);
        this.heartbeatManager = this.datanodeManager.getHeartbeatManager();
        this.startupDelayBlockDeletionInMs = conf.getLong("dfs.namenode.startup.delay.block.deletion.sec", 0L) * 1000L;
        this.invalidateBlocks = new InvalidateBlocks(this.datanodeManager.blockInvalidateLimit, this.startupDelayBlockDeletionInMs);
        this.excessReplicateMap = new ExcessReplicasMap(this.datanodeManager);
        this.blocksMap = new BlocksMap(this.datanodeManager);
        this.blockplacement = BlockPlacementPolicy.getInstance(conf, this.datanodeManager.getFSClusterStats(), this.datanodeManager.getNetworkTopology(), this.datanodeManager.getHost2DatanodeMap());
        this.storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
        this.pendingReplications = new PendingReplicationBlocks((long)conf.getInt("dfs.namenode.replication.pending.timeout-sec", -1) * 1000L);
        this.blockTokenSecretManager = this.createBlockTokenSecretManager(conf);
        this.maxCorruptFilesReturned = conf.getInt("dfs.corruptfilesreturned.max", 500);
        this.defaultReplication = conf.getInt("dfs.replication", 3);
        int maxR = conf.getInt("dfs.replication.max", 512);
        int minR = conf.getInt("dfs.namenode.replication.min", 1);
        if (minR <= 0) {
            throw new IOException("Unexpected configuration parameters: dfs.namenode.replication.min = " + minR + " <= 0");
        }
        if (maxR > Short.MAX_VALUE) {
            throw new IOException("Unexpected configuration parameters: dfs.replication.max = " + maxR + " > " + Short.MAX_VALUE);
        }
        if (minR > maxR) {
            throw new IOException("Unexpected configuration parameters: dfs.namenode.replication.min = " + minR + " > " + "dfs.replication.max" + " = " + maxR);
        }
        this.minReplication = (short)minR;
        this.maxReplication = (short)maxR;
        this.maxReplicationStreams = conf.getInt("dfs.namenode.replication.max-streams", 2);
        this.replicationStreamsHardLimit = conf.getInt("dfs.namenode.replication.max-streams-hard-limit", 4);
        this.shouldCheckForEnoughRacks = conf.get("net.topology.script.file.name") != null;
        this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
        this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
        this.replicationRecheckInterval = (long)conf.getInt("dfs.namenode.replication.interval", 3) * 1000L;
        this.encryptDataTransfer = conf.getBoolean("dfs.encrypt.data.transfer", false);
        this.maxNumBlocksToLog = conf.getLong("dfs.namenode.max-num-blocks-to-log", 1000L);
        this.slicerBatchSize = conf.getInt("dfs.namenode.slicer.batchsize", 500);
        this.processMisReplicatedNoOfBatchs = conf.getInt("dfs.namenode.misreplicated.noofbatches", 100);
        this.slicerNbThreads = conf.getInt("dfs.namenode.slicer.nbofthreads", 20);
        LOG.info("defaultReplication         = " + this.defaultReplication);
        LOG.info("maxReplication             = " + this.maxReplication);
        LOG.info("minReplication             = " + this.minReplication);
        LOG.info("maxReplicationStreams      = " + this.maxReplicationStreams);
        LOG.info("shouldCheckForEnoughRacks  = " + this.shouldCheckForEnoughRacks);
        LOG.info("replicationRecheckInterval = " + this.replicationRecheckInterval);
        LOG.info("encryptDataTransfer        = " + this.encryptDataTransfer);
        LOG.info("maxNumBlocksToLog          = " + this.maxNumBlocksToLog);
        LOG.info("slicerBatchSize            = " + this.slicerBatchSize);
        LOG.info("misReplicatedNoOfBatchs    = " + this.processMisReplicatedNoOfBatchs);
        LOG.info("slicerNbOfBatchs           = " + this.processMisReplicatedNoOfBatchs);
    }

    private NameNodeBlockTokenSecretManager createBlockTokenSecretManager(Configuration conf) throws IOException {
        boolean isEnabled = conf.getBoolean("dfs.block.access.token.enable", false);
        LOG.info("dfs.block.access.token.enable=" + isEnabled);
        if (!isEnabled) {
            if (UserGroupInformation.isSecurityEnabled()) {
                String errMessage = "Security is enabled but block access tokens (via dfs.block.access.token.enable) aren't enabled. This may cause issues when clients attempt to connect to a DataNode. Aborting NameNode";
                throw new IOException(errMessage);
            }
            return null;
        }
        long updateMin = conf.getLong("dfs.block.access.key.update.interval", 450L);
        long lifetimeMin = conf.getLong("dfs.block.access.token.lifetime", 600L);
        String encryptionAlgorithm = conf.get("dfs.encrypt.data.transfer.algorithm");
        LOG.info("dfs.block.access.key.update.interval=" + updateMin + " min(s), " + "dfs.block.access.token.lifetime" + "=" + lifetimeMin + " min(s), " + "dfs.encrypt.data.transfer.algorithm" + "=" + encryptionAlgorithm);
        return new NameNodeBlockTokenSecretManager(updateMin * 60L * 1000L, lifetimeMin * 60L * 1000L, null, encryptionAlgorithm, this.namesystem);
    }

    public BlockStoragePolicy getDefaultStoragePolicy() {
        return this.storagePolicySuite.getDefaultPolicy();
    }

    public BlockStoragePolicy getStoragePolicy(String policyName) {
        return this.storagePolicySuite.getPolicy(policyName);
    }

    public BlockStoragePolicy getStoragePolicy(byte policyId) {
        return this.storagePolicySuite.getPolicy(policyId);
    }

    public BlockStoragePolicy[] getStoragePolicies() {
        return this.storagePolicySuite.getAllPolicies();
    }

    public void setBlockPoolId(String blockPoolId) {
        if (this.isBlockTokenEnabled()) {
            this.blockTokenSecretManager.setBlockPoolId(blockPoolId);
        }
    }

    public BlockStoragePolicySuite getStoragePolicySuite() {
        return this.storagePolicySuite;
    }

    @VisibleForTesting
    public BlockTokenSecretManager getBlockTokenSecretManager() {
        return this.blockTokenSecretManager;
    }

    @VisibleForTesting
    void enableRMTerminationForTesting() {
        this.checkNSRunning = false;
    }

    private boolean isBlockTokenEnabled() {
        return this.blockTokenSecretManager != null;
    }

    boolean shouldUpdateBlockKey(long updateTime) throws IOException {
        return this.isBlockTokenEnabled() ? this.blockTokenSecretManager.updateKeys(updateTime) : false;
    }

    public void activate(Configuration conf) throws IOException {
        this.pendingReplications.start();
        this.datanodeManager.activate(conf);
        this.replicationThread.start();
        if (this.isBlockTokenEnabled()) {
            this.blockTokenSecretManager.initKeys();
        }
    }

    public void close() {
        try {
            this.replicationThread.interrupt();
            this.replicationThread.join(3000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.datanodeManager.close();
        this.pendingReplications.stop();
        this.blocksMap.close();
    }

    public DatanodeManager getDatanodeManager() {
        return this.datanodeManager;
    }

    @VisibleForTesting
    public BlockPlacementPolicy getBlockPlacementPolicy() {
        return this.blockplacement;
    }

    public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
        if (newpolicy == null) {
            throw new HadoopIllegalArgumentException("newpolicy == null");
        }
        this.blockplacement = newpolicy;
    }

    public int getMaxReplicationStreams() {
        return this.maxReplicationStreams;
    }

    public boolean checkMinReplication(BlockInfoContiguous block) throws IOException {
        return this.countNodes(block).liveReplicas() >= this.minReplication;
    }

    private static boolean commitBlock(BlockInfoContiguousUnderConstruction block, Block commitBlock, DatanodeManager datanodeMgr) throws IOException {
        if (block.getBlockUCState() == HdfsServerConstants.BlockUCState.COMMITTED) {
            return false;
        }
        assert (block.getNumBytes() <= commitBlock.getNumBytes()) : "commitBlock length is less than the stored one " + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
        block.commitBlock(commitBlock, datanodeMgr);
        return true;
    }

    public boolean commitOrCompleteLastBlock(BlockCollection bc, Block commitBlock) throws IOException, StorageException {
        if (commitBlock == null) {
            return false;
        }
        BlockInfoContiguous lastBlock = bc.getLastBlock();
        if (lastBlock == null) {
            return false;
        }
        if (lastBlock.isComplete()) {
            return false;
        }
        boolean b = BlockManager.commitBlock((BlockInfoContiguousUnderConstruction)lastBlock, commitBlock, this.getDatanodeManager());
        int numReplicas = this.countNodes(lastBlock).liveReplicas();
        if (numReplicas >= this.minReplication) {
            this.completeBlock(bc, lastBlock.getBlockIndex(), false);
        }
        return b;
    }

    private BlockInfoContiguous completeBlock(BlockCollection bc, int blkIndex, boolean force) throws IOException, StorageException {
        if (blkIndex < 0) {
            return null;
        }
        BlockInfoContiguous curBlock = bc.getBlock(blkIndex);
        if (curBlock.isComplete()) {
            return curBlock;
        }
        BlockInfoContiguousUnderConstruction ucBlock = (BlockInfoContiguousUnderConstruction)curBlock;
        int numNodes = ucBlock.numNodes(this.datanodeManager);
        if (!force && numNodes < this.minReplication) {
            throw new IOException("Cannot complete block: block does not satisfy minimal replication requirement.");
        }
        if (!force && ucBlock.getBlockUCState() != HdfsServerConstants.BlockUCState.COMMITTED) {
            throw new IOException("Cannot complete block: block has not been COMMITTED by the client");
        }
        BlockInfoContiguous completeBlock = ucBlock.convertToCompleteBlock();
        bc.setBlock(blkIndex, completeBlock);
        this.namesystem.adjustSafeModeBlockTotals(null, 1);
        this.namesystem.incrementSafeBlockCount(Math.min(numNodes, this.minReplication), curBlock);
        return completeBlock;
    }

    private BlockInfoContiguous completeBlock(BlockCollection bc, BlockInfoContiguous block, boolean force) throws IOException, StorageException {
        BlockInfoContiguous blk = bc.getBlock(block.getBlockIndex());
        if (blk == block) {
            return this.completeBlock(bc, blk.getBlockIndex(), force);
        }
        return block;
    }

    public BlockInfoContiguous forceCompleteBlock(BlockCollection bc, BlockInfoContiguousUnderConstruction block) throws IOException {
        block.commitBlock(block, this.getDatanodeManager());
        return this.completeBlock(bc, block, true);
    }

    public LocatedBlock convertLastBlockToUnderConstruction(BlockCollection bc, long bytesToRemove) throws IOException {
        BlockInfoContiguous oldBlock = bc.getLastBlock();
        if (oldBlock == null || bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove) {
            return null;
        }
        assert (oldBlock == this.getStoredBlock(oldBlock)) : "last block of the file is not in blocksMap";
        DatanodeStorageInfo[] targets = this.getStorages(oldBlock);
        BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets);
        NumberReplicas replicas = this.countNodes(ucBlock);
        this.neededReplications.remove(ucBlock, replicas.liveReplicas(), replicas.decommissionedAndDecommissioning(), this.getReplication(ucBlock));
        this.pendingReplications.remove(ucBlock);
        for (DatanodeStorageInfo target : targets) {
            this.invalidateBlocks.remove(target, oldBlock);
        }
        ArrayList<Block> deltaSafe = new ArrayList<Block>();
        if (targets.length >= this.minReplication) {
            deltaSafe.add(oldBlock);
        }
        this.namesystem.adjustSafeModeBlockTotals(deltaSafe, -1);
        long fileLength = bc.computeContentSummary(this.getStoragePolicySuite()).getLength();
        long pos = fileLength - ucBlock.getNumBytes();
        return this.createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE);
    }

    private List<DatanodeStorageInfo> getValidLocations(BlockInfoContiguous block) throws StorageException, TransactionContextException {
        ArrayList<DatanodeStorageInfo> storageSet = new ArrayList<DatanodeStorageInfo>();
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(block)) {
            if (this.invalidateBlocks.contains(storage, block)) continue;
            storageSet.add(storage);
        }
        return storageSet;
    }

    private List<LocatedBlock> createLocatedBlockList(BlockInfoContiguous[] blocks, long offset, long length, int nrBlocksToReturn, BlockTokenIdentifier.AccessMode mode) throws IOException, StorageException {
        int curBlk = 0;
        long curPos = 0L;
        long blkSize = 0L;
        int nrBlocks = blocks[0].getNumBytes() == 0L ? 0 : blocks.length;
        for (curBlk = 0; curBlk < nrBlocks; ++curBlk) {
            blkSize = blocks[curBlk].getNumBytes();
            assert (blkSize > 0L) : "Block of size 0";
            if (curPos + blkSize > offset) break;
            curPos += blkSize;
        }
        if (nrBlocks > 0 && curBlk == nrBlocks) {
            return Collections.emptyList();
        }
        long endOff = offset + length;
        ArrayList<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
        do {
            results.add(this.createLocatedBlock(blocks[curBlk], curPos, mode));
        } while ((curPos += blocks[++curBlk].getNumBytes()) < endOff && curBlk < blocks.length && results.size() < nrBlocksToReturn);
        return results;
    }

    private LocatedBlock createLocatedBlock(BlockInfoContiguous[] blocks, long endPos, BlockTokenIdentifier.AccessMode mode) throws IOException {
        long blkSize;
        int curBlk = 0;
        long curPos = 0L;
        int nrBlocks = blocks[0].getNumBytes() == 0L ? 0 : blocks.length;
        for (curBlk = 0; curBlk < nrBlocks && curPos + (blkSize = blocks[curBlk].getNumBytes()) < endPos; ++curBlk) {
            curPos += blkSize;
        }
        return this.createLocatedBlock(blocks[curBlk], curPos, mode);
    }

    private List<LocatedBlock> createPhantomLocatedBlockList(INodeFile file, byte[] data, BlockTokenIdentifier.AccessMode mode) throws IOException, StorageException {
        ArrayList<LocatedBlock> results = new ArrayList<LocatedBlock>(1);
        BlockInfoContiguous fakeBlk = new BlockInfoContiguous();
        fakeBlk.setBlockIdNoPersistance(-file.getId());
        fakeBlk.setINodeIdNoPersistance(-file.getId());
        fakeBlk.setBlockIndexNoPersistance(0);
        fakeBlk.setNumBytesNoPersistance(file.getSize());
        fakeBlk.setTimestampNoPersistance(file.getModificationTime());
        ExtendedBlock eb = new ExtendedBlock(this.namesystem.getBlockPoolId(), fakeBlk);
        List<DatanodeDescriptor> dnds = this.datanodeManager.getRandomDN(file.getBlockReplication());
        ArrayList<DatanodeDescriptor> dnInfos = new ArrayList<DatanodeDescriptor>(dnds);
        if (dnInfos.size() == 0) {
            for (int i = 0; i < file.getBlockReplication(); ++i) {
                DatanodeID phantomDatanodID = new DatanodeID(this.namesystem.getNameNode().getServiceRpcAddress().getAddress().getHostAddress(), this.namesystem.getNameNode().getServiceRpcAddress().getAddress().getCanonicalHostName(), this.namesystem.getBlockPoolId(), 50010, 50075, 50475, 50020);
                DatanodeInfo phantomDatanode = new DatanodeInfo(phantomDatanodID);
                dnInfos.add((DatanodeDescriptor)phantomDatanode);
            }
        }
        LocatedBlock locatedBlock = new LocatedBlock(eb, dnInfos.toArray(new DatanodeInfo[dnInfos.size()]), 0L, false);
        locatedBlock.setData(data);
        results.add(locatedBlock);
        return results;
    }

    private LocatedBlock createLocatedBlock(BlockInfoContiguous blk, long pos, BlockTokenIdentifier.AccessMode mode) throws IOException {
        LocatedBlock lb = this.createLocatedBlock(blk, pos);
        if (mode != null) {
            this.setBlockToken(lb, mode);
        }
        return lb;
    }

    private LocatedBlock createLocatedBlock(BlockInfoContiguous blk, long pos) throws IOException {
        int numNodes;
        int numCorruptReplicas;
        if (blk instanceof BlockInfoContiguousUnderConstruction) {
            if (blk.isComplete()) {
                throw new IOException("blk instanceof BlockInfoUnderConstruction && blk.isComplete(), blk=" + blk);
            }
            BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)blk;
            DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(this.datanodeManager);
            ExtendedBlock eb = new ExtendedBlock(this.namesystem.getBlockPoolId(), blk);
            return BlockManager.newLocatedBlock(eb, storages, pos, false);
        }
        int numCorruptNodes = this.countNodes(blk).corruptReplicas();
        if (numCorruptNodes != (numCorruptReplicas = this.corruptReplicas.numCorruptReplicas(blk))) {
            LOG.warn("Inconsistent number of corrupt replicas for " + blk + " blockMap has " + numCorruptNodes + " but corrupt replicas map has " + numCorruptReplicas);
        }
        boolean isCorrupt = numCorruptNodes == (numNodes = this.blocksMap.numNodes(blk));
        int numMachines = isCorrupt ? numNodes : numNodes - numCorruptNodes;
        DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numMachines];
        int j = 0;
        if (numMachines > 0) {
            for (DatanodeStorageInfo storage : this.blocksMap.storageList(blk)) {
                boolean replicaCorrupt = this.corruptReplicas.isReplicaCorrupt(blk, storage.getDatanodeDescriptor());
                if (!isCorrupt && replicaCorrupt) continue;
                storages[j++] = storage;
            }
        }
        assert (j == storages.length) : "isCorrupt: " + isCorrupt + " numStorages: " + numMachines + " numNodes: " + numNodes + " numCorrupt: " + numCorruptNodes + " numCorruptRepls: " + numCorruptReplicas;
        ExtendedBlock eb = new ExtendedBlock(this.namesystem.getBlockPoolId(), blk);
        return BlockManager.newLocatedBlock(eb, storages, pos, isCorrupt);
    }

    public LocatedBlocks createPhantomLocatedBlocks(INodeFile file, byte[] data, boolean isFileUnderConstruction, boolean needBlockToken, FileEncryptionInfo feInfo) throws IOException, StorageException {
        if (needBlockToken) {
            new IOException("Block Tokens are not currently supported for files stored in the database");
        }
        BlockTokenIdentifier.AccessMode mode = needBlockToken ? BlockTokenIdentifier.AccessMode.READ : null;
        List<LocatedBlock> locatedblocks = this.createPhantomLocatedBlockList(file, data, mode);
        return new LocatedBlocks(file.getSize(), isFileUnderConstruction, locatedblocks, null, false, feInfo);
    }

    public LocatedBlocks createLocatedBlocks(BlockInfoContiguous[] blocks, long fileSizeExcludeBlocksUnderConstruction, boolean isFileUnderConstruction, long offset, long length, boolean needBlockToken, FileEncryptionInfo feInfo) throws IOException, StorageException {
        if (blocks == null) {
            return null;
        }
        if (blocks.length == 0) {
            return new LocatedBlocks(0L, isFileUnderConstruction, Collections.emptyList(), null, false, feInfo);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("blocks = " + Arrays.asList(blocks));
        }
        BlockTokenIdentifier.AccessMode mode = needBlockToken ? BlockTokenIdentifier.AccessMode.READ : null;
        List<LocatedBlock> locatedblocks = this.createLocatedBlockList(blocks, offset, length, Integer.MAX_VALUE, mode);
        BlockInfoContiguous last = blocks[blocks.length - 1];
        long lastPos = last.isComplete() ? fileSizeExcludeBlocksUnderConstruction - last.getNumBytes() : fileSizeExcludeBlocksUnderConstruction;
        LocatedBlock lastlb = this.createLocatedBlock(last, lastPos, mode);
        boolean isComplete = last.isComplete();
        return new LocatedBlocks(fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo);
    }

    public ExportedBlockKeys getBlockKeys() throws IOException {
        return this.isBlockTokenEnabled() ? this.blockTokenSecretManager.exportKeys() : ExportedBlockKeys.DUMMY_KEYS;
    }

    public void setBlockToken(LocatedBlock b, BlockTokenIdentifier.AccessMode mode) throws IOException {
        if (this.isBlockTokenEnabled()) {
            b.setBlockToken(this.blockTokenSecretManager.generateToken(NameNode.getRemoteUser().getShortUserName(), b.getBlock(), EnumSet.of(mode)));
        }
    }

    void addKeyUpdateCommand(List<DatanodeCommand> cmds, DatanodeDescriptor nodeinfo) throws IOException {
        if (this.isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
            cmds.add(new KeyUpdateCommand(this.blockTokenSecretManager.exportKeys()));
            nodeinfo.needKeyUpdate = false;
        }
    }

    public DataEncryptionKey generateDataEncryptionKey() throws IOException {
        if (this.isBlockTokenEnabled() && this.encryptDataTransfer) {
            return this.blockTokenSecretManager.generateDataEncryptionKey();
        }
        return null;
    }

    public short adjustReplication(short replication) {
        return replication < this.minReplication ? this.minReplication : (replication > this.maxReplication ? this.maxReplication : replication);
    }

    public void verifyReplication(String src, short replication, String clientName) throws IOException {
        if (replication >= this.minReplication && replication <= this.maxReplication) {
            return;
        }
        String text = "file " + src + (clientName != null ? " on client " + clientName : "") + ".\nRequested replication " + replication;
        if (replication > this.maxReplication) {
            throw new IOException(text + " exceeds maximum " + this.maxReplication);
        }
        if (replication < this.minReplication) {
            throw new IOException(text + " is less than the required minimum " + this.minReplication);
        }
    }

    public boolean isSufficientlyReplicated(BlockInfoContiguous b) throws IOException {
        int replication = Math.min(this.minReplication, this.getDatanodeManager().getNumLiveDataNodes());
        return this.countLiveNodes(b) >= replication;
    }

    public BlocksWithLocations getBlocks(DatanodeID datanode, long size) throws IOException {
        this.namesystem.checkSuperuserPrivilege();
        return this.getBlocksWithLocations(datanode, size);
    }

    private BlocksWithLocations getBlocksWithLocations(DatanodeID datanode, long size) throws UnregisteredNodeException, IOException {
        BlockInfoContiguous curBlock;
        long totalSize;
        ArrayList<Block> toAdd;
        DatanodeDescriptor node = this.getDatanodeManager().getDatanode(datanode);
        if (node == null) {
            blockLog.warn("BLOCK* getBlocks: Asking for blocks from an unrecorded node {}", (Object)datanode);
            throw new HadoopIllegalArgumentException("Datanode " + datanode + " not found.");
        }
        int numBlocks = node.numBlocks();
        if (numBlocks == 0) {
            return new BlocksWithLocations(new BlocksWithLocations.BlockWithLocations[0]);
        }
        int startBlock = DFSUtil.getRandom().nextInt(numBlocks);
        Iterator<BlockInfoContiguous> iter = node.getBlockIterator(startBlock);
        ArrayList<BlocksWithLocations.BlockWithLocations> results = new ArrayList<BlocksWithLocations.BlockWithLocations>();
        for (totalSize = 0L; totalSize < size && iter.hasNext(); totalSize += this.addBlocks(toAdd, results)) {
            toAdd = new ArrayList<Block>();
            long estimatedSize = 0L;
            while (totalSize + estimatedSize < size && iter.hasNext()) {
                curBlock = iter.next();
                if (!curBlock.isComplete()) continue;
                toAdd.add(curBlock);
                estimatedSize += curBlock.getNumBytes();
            }
        }
        if (totalSize < size) {
            iter = node.getBlockIterator();
            int i = 0;
            while (i < startBlock && totalSize < size) {
                ArrayList<Block> toAdd2 = new ArrayList<Block>();
                long estimatedSize = 0L;
                while (totalSize + estimatedSize < size && i < startBlock) {
                    curBlock = iter.next();
                    ++i;
                    if (!curBlock.isComplete()) continue;
                    toAdd2.add(curBlock);
                    estimatedSize += curBlock.getNumBytes();
                }
                totalSize += this.addBlocks(toAdd2, results);
            }
        }
        return new BlocksWithLocations(results.toArray(new BlocksWithLocations.BlockWithLocations[results.size()]));
    }

    void datanodeRemoved(final DatanodeDescriptor node, boolean async) throws IOException {
        Future<Object> future = this.datanodeRemover.submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                try {
                    DatanodeStorageInfo[] storageInfos;
                    Map<Long, Long> allBlocksAndInodesIds = node.getAllStorageReplicas(BlockManager.this.numBuckets, BlockManager.this.blockFetcherNBThreads, BlockManager.this.blockFetcherBucketsPerThread, ((FSNamesystem)BlockManager.this.namesystem).getFSOperationsExecutor());
                    BlockManager.this.removeBlocks(allBlocksAndInodesIds, node);
                    for (DatanodeStorageInfo storageInfo : storageInfos = node.getStorageInfos()) {
                        HashBuckets.getInstance().resetBuckets(storageInfo.getSid());
                    }
                    return null;
                }
                catch (Throwable t) {
                    LOG.error(t.getMessage(), t);
                    throw t;
                }
            }
        });
        node.resetBlocks();
        List<Integer> sids = this.datanodeManager.getSidsOnDatanode(node.getDatanodeUuid());
        this.invalidateBlocks.remove(sids);
        if (!async) {
            try {
                future.get();
            }
            catch (Exception e) {
                if (e instanceof IOException) {
                    throw (IOException)e;
                }
                throw new IOException(e);
            }
        }
    }

    void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) throws IOException {
        this.datanodeRemover.submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                try {
                    Map<Long, Long> allBlocksAndInodesIds = storageInfo.getAllStorageReplicas(BlockManager.this.numBuckets, BlockManager.this.blockFetcherNBThreads, BlockManager.this.blockFetcherBucketsPerThread, ((FSNamesystem)BlockManager.this.namesystem).getFSOperationsExecutor());
                    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
                    BlockManager.this.removeBlocks(allBlocksAndInodesIds, node);
                    HashBuckets.getInstance().resetBuckets(storageInfo.getSid());
                    BlockManager.this.namesystem.checkSafeMode();
                    return null;
                }
                catch (Throwable t) {
                    LOG.error(t.getMessage(), t);
                    throw t;
                }
            }
        });
        this.invalidateBlocks.remove(storageInfo.getSid());
    }

    void removeBlocksAssociatedTo(final int sid) throws IOException {
        this.datanodeRemover.submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                try {
                    Map<Long, Long> allBlocksAndInodesIds = DatanodeStorageInfo.getAllStorageReplicas(BlockManager.this.numBuckets, sid, BlockManager.this.blockFetcherNBThreads, BlockManager.this.blockFetcherBucketsPerThread, ((FSNamesystem)BlockManager.this.namesystem).getFSOperationsExecutor());
                    BlockManager.this.removeBlocks((Map<Long, Long>)allBlocksAndInodesIds, sid);
                    HashBuckets.getInstance().resetBuckets(sid);
                    BlockManager.this.namesystem.checkSafeMode();
                    return null;
                }
                catch (Throwable t) {
                    LOG.error(t.getMessage(), t);
                    throw t;
                }
            }
        });
        this.invalidateBlocks.remove(sid);
    }

    void addToInvalidates(BlockInfoContiguous block, DatanodeInfo datanode) throws StorageException, TransactionContextException, UnregisteredNodeException, IOException {
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        DatanodeDescriptor dn = this.datanodeManager.getDatanode(datanode);
        DatanodeStorageInfo storage = block.getStorageOnNode(dn);
        if (storage != null) {
            this.addToInvalidates(block, storage);
        }
    }

    void addToInvalidates(BlockInfoContiguous block, DatanodeStorageInfo storage) throws TransactionContextException, StorageException, IOException {
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        this.invalidateBlocks.add(block, storage, true);
    }

    public void addToInvalidates(Block block) throws StorageException, TransactionContextException, IOException {
        DatanodeStorageInfo[] storages;
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        StringBuilder datanodes = new StringBuilder();
        BlockInfoContiguous storedBlock = this.getStoredBlock(block);
        if (storedBlock == null) {
            LOG.error("This fucntion should not be called with a block that does not exist anymore, blockId:" + block.getBlockId());
            throw new StorageException("This fucntion should not be called with a block that does not exist anymore, blockId:" + block.getBlockId());
        }
        for (DatanodeStorageInfo storage : storages = storedBlock.getStorages(this.datanodeManager, DatanodeStorage.State.NORMAL)) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            this.invalidateBlocks.add(block, storage, false, storedBlock.inodeId);
            datanodes.append(node).append(" ");
        }
        if (datanodes.length() != 0) {
            blockLog.info("BLOCK* addToInvalidates: {} {}", (Object)block, (Object)datanodes.toString());
        }
    }

    void removeFromInvalidates(DatanodeDescriptor datanode) throws IOException {
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        for (int sid : datanode.getSidsOnNode()) {
            this.invalidateBlocks.remove(sid);
        }
    }

    public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, DatanodeInfo dn, String storageID, final String reason) throws IOException {
        final DatanodeDescriptor node = this.getDatanodeManager().getDatanode(dn);
        final DatanodeStorageInfo storage = storageID == null ? null : node.getStorageInfo(storageID);
        new HopsTransactionalRequestHandler(HDFSOperationType.FIND_AND_MARK_BLOCKS_AS_CORRUPT){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                Block b = blk.getLocalBlock();
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(blk.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.UC, LockFactory.BLK.IV));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws StorageException, IOException {
                BlockInfoContiguous storedBlock = BlockManager.this.getStoredBlock(blk.getLocalBlock());
                if (storedBlock == null) {
                    blockLog.info("BLOCK* findAndMarkBlockAsCorrupt: " + blk + " not found");
                    return null;
                }
                BlockToMarkCorrupt b = new BlockToMarkCorrupt(storedBlock, blk.getGenerationStamp(), reason, CorruptReplicasMap.Reason.CORRUPTION_REPORTED);
                BlockManager.this.markBlockAsCorrupt(b, storage, node);
                return null;
            }
        }.handle(this.namesystem);
    }

    private void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException, StorageException {
        boolean corruptedDuringWrite;
        if (b.corrupted.isDeleted()) {
            blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as corrupt as it does not belong to any file", (Object)b);
            this.addToInvalidates(b.corrupted, node);
            return;
        }
        BlockCollection bc = b.corrupted.getBlockCollection();
        short expectedReplicas = bc.getBlockReplication();
        if (storageInfo == null) {
            storageInfo = b.corrupted.getStorageOnNode(node);
        }
        if (storageInfo != null) {
            storageInfo.addBlock(b.stored);
        }
        this.corruptReplicas.addToCorruptReplicasMap(b.corrupted, storageInfo, b.reason, b.reasonCode);
        NumberReplicas numberOfReplicas = this.countNodes(b.stored);
        boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas;
        boolean minReplicationSatisfied = numberOfReplicas.liveReplicas() >= this.minReplication;
        boolean hasMoreCorruptReplicas = minReplicationSatisfied && numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas() > expectedReplicas;
        boolean bl = corruptedDuringWrite = minReplicationSatisfied && b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp();
        if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) {
            this.invalidateBlock(b, node);
        } else if (this.namesystem.isPopulatingReplQueues()) {
            this.updateNeededReplications(b.stored, -1, 0);
        }
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        if (!fsNamesystem.isErasureCodingEnabled()) {
            return;
        }
        if (numberOfReplicas.liveReplicas() == 0) {
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{bc.getId()});
            if (status != null) {
                if (!status.isCorrupt()) {
                    status.setStatus(EncodingStatus.Status.REPAIR_REQUESTED);
                    status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                }
                status.setLostBlocks(Integer.valueOf(status.getLostBlocks() + 1));
                EntityManager.update((Object)status);
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{bc.getId()});
                if (status != null) {
                    if (!status.isParityCorrupt()) {
                        status.setParityStatus(EncodingStatus.ParityStatus.REPAIR_REQUESTED);
                        status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    status.setLostParityBlocks(Integer.valueOf(status.getLostParityBlocks() + 1));
                    EntityManager.update((Object)status);
                    LOG.info("markBlockAsCorrupt updated parity status to repair requested");
                }
            }
        }
    }

    private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn) throws IOException {
        blockLog.info("BLOCK* invalidateBlock: {} on {}", (Object)b, (Object)dn);
        DatanodeDescriptor node = this.getDatanodeManager().getDatanode(dn);
        if (node == null) {
            throw new IOException("Cannot invalidate " + b + " because datanode " + dn + " does not exist.");
        }
        NumberReplicas nr = this.countNodes(b.stored);
        if (nr.replicasOnStaleNodes() > 0) {
            blockLog.info("BLOCK* invalidateBlocks: postponing invalidation of {} on {} because {} replica(s) are located on nodes with potentially out-of-date block reports", new Object[]{b, dn, nr.replicasOnStaleNodes()});
            this.postponeBlock(b.corrupted);
            return false;
        }
        if (nr.liveReplicas() >= 1) {
            this.addToInvalidates(b.corrupted, dn);
            this.removeStoredBlock((Block)b.stored, node);
            if (blockLog.isDebugEnabled()) {
                blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", (Object)b, (Object)dn);
            }
            return true;
        }
        blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and was not deleted", (Object)b, (Object)dn);
        return false;
    }

    private void postponeBlock(Block blk) {
        if (this.postponedMisreplicatedBlocks.add(blk)) {
            this.postponedMisreplicatedBlocksCount.incrementAndGet();
        }
    }

    void updateState() throws IOException {
        this.pendingReplicationBlocksCount = this.pendingReplications.size();
        this.underReplicatedBlocksCount = this.neededReplications.size();
        this.corruptReplicaBlocksCount = this.corruptReplicas.size();
    }

    public int getUnderReplicatedNotMissingBlocks() throws IOException {
        return this.neededReplications.getUnderReplicatedBlockCount();
    }

    int computeInvalidateWork(int nodesToProcess) throws IOException {
        Map<DatanodeInfo, Set<Integer>> nodesToSids = this.invalidateBlocks.getDatanodes(this.datanodeManager);
        ArrayList<Map.Entry<DatanodeInfo, Set<Integer>>> nodes = new ArrayList<Map.Entry<DatanodeInfo, Set<Integer>>>(nodesToSids.entrySet());
        Collections.shuffle(nodes);
        nodesToProcess = Math.min(nodes.size(), nodesToProcess);
        int blockCnt = 0;
        for (Map.Entry entry : nodes) {
            int blocks = this.invalidateWorkForOneNode(entry);
            if (blocks <= 0) continue;
            blockCnt += blocks;
            if (--nodesToProcess != 0) continue;
            break;
        }
        return blockCnt;
    }

    int computeReplicationWork(int blocksToProcess) throws IOException {
        List<List<Block>> blocksToReplicate = this.neededReplications.chooseUnderReplicatedBlocks(blocksToProcess);
        return this.computeReplicationWorkForBlocks(blocksToReplicate);
    }

    @VisibleForTesting
    int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) throws IOException {
        int scheduledWork = 0;
        for (int priority = 0; priority < blocksToReplicate.size(); ++priority) {
            for (Block block : blocksToReplicate.get(priority)) {
                scheduledWork += this.computeReplicationWorkForBlock(block, priority);
            }
        }
        return scheduledWork;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int computeReplicationWorkForBlockInternal(Block blk, int priority1) throws StorageException, IOException {
        DatanodeStorageInfo[] targets;
        int numEffectiveReplicas;
        short requiredReplication;
        BlockCollection bc = null;
        int scheduledWork = 0;
        LinkedList<ReplicationWork> work = new LinkedList<ReplicationWork>();
        BlockInfoContiguous storedBlock = this.getStoredBlock(blk);
        UnderReplicatedBlocks underReplicatedBlocks = this.neededReplications;
        synchronized (underReplicatedBlocks) {
            bc = this.blocksMap.getBlockCollection(blk);
            if (storedBlock == null || bc == null || bc.isUnderConstruction() && storedBlock.equals(bc.getLastBlock())) {
                if (storedBlock == null) {
                    storedBlock = new BlockInfoContiguous();
                    storedBlock.setBlockIdNoPersistance(blk.getBlockId());
                    storedBlock.setINodeIdNoPersistance(BlockInfoContiguous.NON_EXISTING_ID);
                }
                if (this.neededReplications.remove(storedBlock)) {
                    this.neededReplications.decrementReplicationIndex(priority1);
                }
                return scheduledWork;
            }
            requiredReplication = bc.getBlockReplication();
            ArrayList<DatanodeDescriptor> containingNodes = new ArrayList<DatanodeDescriptor>();
            ArrayList<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
            NumberReplicas numReplicas = new NumberReplicas();
            DatanodeDescriptor srcNode = this.chooseSourceDatanode(storedBlock, containingNodes, liveReplicaNodes, numReplicas, priority1);
            if (srcNode == null) {
                LOG.debug("Block " + storedBlock + " cannot be repl from any storage");
                return scheduledWork;
            }
            assert (liveReplicaNodes.size() >= numReplicas.liveReplicas());
            numEffectiveReplicas = numReplicas.liveReplicas() + this.pendingReplications.getNumReplicas(storedBlock);
            if (numEffectiveReplicas >= requiredReplication && (this.pendingReplications.getNumReplicas(storedBlock) > 0 || this.blockHasEnoughRacks(storedBlock))) {
                if (this.neededReplications.remove(storedBlock)) {
                    this.neededReplications.decrementReplicationIndex(priority1);
                    blockLog.info("BLOCK* Removing " + storedBlock + " from neededReplications as it has enough replicas");
                }
                return scheduledWork;
            }
            int additionalReplRequired = numReplicas.liveReplicas() < requiredReplication ? requiredReplication - numEffectiveReplicas : 1;
            work.add(new ReplicationWork(storedBlock, bc, srcNode, containingNodes, liveReplicaNodes, additionalReplRequired, priority1));
        }
        HashSet<DatanodeDescriptor> excludedNodes = new HashSet<DatanodeDescriptor>();
        for (ReplicationWork rw : work) {
            excludedNodes.clear();
            for (Object dn : rw.containingNodes) {
                excludedNodes.add((DatanodeDescriptor)dn);
            }
            rw.chooseTargets(this.blockplacement, this.storagePolicySuite, excludedNodes);
        }
        for (ReplicationWork rw : work) {
            Object dn;
            targets = rw.targets;
            if (targets == null || targets.length == 0) {
                ReplicationWork.access$902(rw, null);
                continue;
            }
            dn = this.neededReplications;
            synchronized (dn) {
                BlockInfoContiguous block = rw.block;
                int priority = rw.priority;
                bc = this.blocksMap.getBlockCollection(block);
                requiredReplication = bc.getBlockReplication();
                NumberReplicas numReplicas = this.countNodes(block);
                numEffectiveReplicas = numReplicas.liveReplicas() + this.pendingReplications.getNumReplicas(storedBlock);
                if (numEffectiveReplicas >= requiredReplication && (this.pendingReplications.getNumReplicas(storedBlock) > 0 || this.blockHasEnoughRacks(block))) {
                    blockLog.info("BLOCK* Removing {} from neededReplications as it has enough replicas", (Object)block);
                    if (this.neededReplications.remove(storedBlock)) {
                        this.neededReplications.decrementReplicationIndex(priority);
                    }
                    ReplicationWork.access$902(rw, null);
                    continue;
                }
                if (numReplicas.liveReplicas() >= requiredReplication && !this.blockHasEnoughRacks(block) && rw.srcNode.getNetworkLocation().equals(targets[0].getDatanodeDescriptor().getNetworkLocation())) {
                    continue;
                }
                rw.srcNode.addBlockToBeReplicated(block, targets);
                ++scheduledWork;
                DatanodeStorageInfo.incrementBlocksScheduled(targets);
                this.pendingReplications.increment(storedBlock, DatanodeStorageInfo.toDatanodeDescriptors(targets));
                if (blockLog.isDebugEnabled()) {
                    blockLog.debug("BLOCK* block {} is moved from neededReplications to pendingReplications", (Object)block);
                }
                if (numEffectiveReplicas + targets.length >= requiredReplication && this.neededReplications.remove(storedBlock)) {
                    this.neededReplications.decrementReplicationIndex(priority);
                }
            }
        }
        if (blockLog.isInfoEnabled()) {
            for (ReplicationWork rw : work) {
                targets = rw.targets;
                if (targets == null || targets.length == 0) continue;
                StringBuilder targetList = new StringBuilder("datanode(s)");
                for (DatanodeStorageInfo target : targets) {
                    targetList.append(' ');
                    targetList.append(target);
                }
                blockLog.info("BLOCK* ask {} to replicate {} to {}", new Object[]{rw.srcNode, rw.block, targetList});
            }
        }
        if (blockLog.isDebugEnabled()) {
            blockLog.debug("BLOCK* neededReplications = {} pendingReplications = {}", (Object)this.neededReplications.size(), (Object)this.pendingReplications.size());
        }
        return scheduledWork;
    }

    public DatanodeStorageInfo[] chooseTarget4NewBlock(String src, int numOfReplicas, Node client, Set<Node> excludedNodes, long blocksize, List<String> favoredNodes, byte storagePolicyID) throws IOException {
        BlockStoragePolicy storagePolicy;
        List<DatanodeDescriptor> favoredDatanodeDescriptors = this.getDatanodeDescriptors(favoredNodes);
        DatanodeStorageInfo[] targets = this.blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, favoredDatanodeDescriptors, storagePolicy = this.storagePolicySuite.getPolicy(storagePolicyID));
        if (targets.length < this.minReplication) {
            throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" + this.minReplication + ").  There are " + this.getDatanodeManager().getNetworkTopology().getNumOfLeaves() + " datanode(s) running and " + (excludedNodes == null ? "no" : Integer.valueOf(excludedNodes.size())) + " node(s) are excluded in this operation. " + (excludedNodes != null ? Arrays.toString(excludedNodes.toArray(new Node[excludedNodes.size()])) : "[]"));
        }
        return targets;
    }

    public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
        return this.blockplacement.chooseTarget(src, 1, clientnode, Collections.emptyList(), false, excludes, blocksize, this.storagePolicySuite.getDefaultPolicy());
    }

    public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src, int numAdditionalNodes, Node clientnode, List<DatanodeStorageInfo> chosen, Set<Node> excludes, long blocksize, byte storagePolicyID) {
        BlockStoragePolicy storagePolicy = this.storagePolicySuite.getPolicy(storagePolicyID);
        return this.blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, chosen, true, excludes, blocksize, storagePolicy);
    }

    public DatanodeStorageInfo[] chooseTarget4ParityRepair(String src, int numOfReplicas, Node clientnode, List<DatanodeStorageInfo> chosen, Set<Node> excludes, long blocksize, byte storagePolicyID) {
        BlockStoragePolicy storagePolicy = this.storagePolicySuite.getPolicy(storagePolicyID);
        return this.blockplacement.chooseTarget(src, numOfReplicas, clientnode, chosen, false, excludes, blocksize, storagePolicy);
    }

    List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
        ArrayList<DatanodeDescriptor> datanodeDescriptors = null;
        if (nodes != null) {
            datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
            for (int i = 0; i < nodes.size(); ++i) {
                DatanodeDescriptor node = this.datanodeManager.getDatanodeDescriptor(nodes.get(i));
                if (node == null) continue;
                datanodeDescriptors.add(node);
            }
        }
        return datanodeDescriptors;
    }

    @VisibleForTesting
    DatanodeDescriptor chooseSourceDatanode(BlockInfoContiguous block, List<DatanodeDescriptor> containingNodes, List<DatanodeStorageInfo> nodesContainingLiveReplicas, NumberReplicas numReplicas, int priority) throws IOException {
        containingNodes.clear();
        nodesContainingLiveReplicas.clear();
        DatanodeDescriptor srcNode = null;
        int live = 0;
        int decommissioned = 0;
        int decommissioning = 0;
        int corrupt = 0;
        int excess = 0;
        Collection<DatanodeDescriptor> nodesCorrupt = this.corruptReplicas.getNodes(block);
        for (DatanodeStorageInfo storage : block.getStorages(this.datanodeManager)) {
            int countableReplica;
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            int n = countableReplica = storage.getState() == DatanodeStorage.State.NORMAL ? 1 : 0;
            if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
                corrupt += countableReplica;
            } else if (node.isDecommissionInProgress()) {
                decommissioning += countableReplica;
            } else if (node.isDecommissioned()) {
                decommissioned += countableReplica;
            } else if (this.excessReplicateMap.contains(storage, block)) {
                excess += countableReplica;
            } else {
                nodesContainingLiveReplicas.add(storage);
                live += countableReplica;
            }
            if (!containingNodes.contains(node)) {
                containingNodes.add(node);
            }
            if (nodesCorrupt != null && nodesCorrupt.contains(node) || priority != 0 && !node.isDecommissionInProgress() && node.getNumberOfBlocksToBeReplicated() >= this.maxReplicationStreams || node.getNumberOfBlocksToBeReplicated() >= this.replicationStreamsHardLimit || this.excessReplicateMap.contains(storage, block) || node.isDecommissioned()) continue;
            if (srcNode == null) {
                srcNode = node;
                continue;
            }
            if (!DFSUtil.getRandom().nextBoolean()) continue;
            srcNode = node;
        }
        if (numReplicas != null) {
            numReplicas.initialize(live, decommissioned, decommissioning, corrupt, excess, 0);
        }
        return srcNode;
    }

    @VisibleForTesting
    void processPendingReplications() throws IOException {
        long[] timedOutItems = this.pendingReplications.getTimedOutBlocks();
        if (timedOutItems != null) {
            for (long timedOutItem : timedOutItems) {
                this.processTimedOutPendingBlock(timedOutItem);
            }
        }
    }

    public List<Integer> checkHashes(DatanodeID nodeID, DatanodeStorage storage, BlockReport newReport) throws IOException {
        boolean firstBlockReport;
        DatanodeDescriptor node = this.datanodeManager.getDatanode(nodeID);
        if (node == null || !node.isAlive) {
            throw new IOException("ReportHashes from dead or unregistered node: " + nodeID);
        }
        DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
        if (storageInfo == null) {
            storageInfo = node.updateStorage(storage);
        }
        boolean bl = firstBlockReport = this.namesystem.isInStartupSafeMode() || storageInfo.getBlockReportCount() == 0;
        if (storageInfo.getBlockReportCount() == 0) {
            HashBuckets.getInstance().createBucketsForStorage(storageInfo);
        }
        HashMatchingResult matchingResult = this.calculateMismatchedHashes(storageInfo, newReport, firstBlockReport);
        blockLog.debug("BLOCK* checkHashes: Number of mismatches buckets for storage: " + storageInfo.getStorageID() + " are: " + matchingResult.mismatchedBuckets);
        return matchingResult.mismatchedBuckets;
    }

    public boolean processReport(DatanodeID nodeID, DatanodeStorage storage, BlockReport newReport, BlockReportContext context, boolean lastStorageInRpc) throws IOException {
        long startTime = Time.monotonicNow();
        DatanodeDescriptor node = this.datanodeManager.getDatanode(nodeID);
        if (node == null || !node.isAlive) {
            throw new IOException("ProcessReport from dead or unregistered node: " + nodeID);
        }
        DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
        if (storageInfo == null) {
            storageInfo = node.updateStorage(storage);
        }
        if (this.namesystem.isInStartupSafeMode() && storageInfo.getBlockReportCount() > 0) {
            blockLog.info("BLOCK* processReport: discarded non-initial block report from {} because namenode still in startup phase", (Object)nodeID);
            return !node.hasStaleStorages();
        }
        ReportStatistics reportStatistics = null;
        try {
            reportStatistics = this.processReport(storageInfo, newReport);
            if (context != null) {
                storageInfo.setLastBlockReportId(context.getReportId());
                if (lastStorageInRpc) {
                    int rpcsSeen = node.updateBlockReportContext(context);
                    if (rpcsSeen >= context.getTotalRpcs()) {
                        List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
                        if (zombies.isEmpty()) {
                            LOG.debug("processReport 0x{}: no zombie storages found.", (Object)Long.toHexString(context.getReportId()));
                        } else {
                            for (DatanodeStorageInfo zombie : zombies) {
                                this.removeZombieReplicas(context, zombie);
                            }
                        }
                        node.clearBlockReportContext();
                    } else {
                        LOG.debug("processReport 0x{}: {} more RPCs remaining in this report.", (Object)Long.toHexString(context.getReportId()), (Object)(context.getTotalRpcs() - rpcsSeen));
                    }
                }
            }
            long endTime = Time.monotonicNow();
            NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
            if (metrics != null) {
                metrics.addBlockReport((int)(endTime - startTime));
            }
            blockLog.info("BLOCK* processReport success: from " + nodeID + " storage: " + storage + ", blocks: " + newReport.getNumberOfBlocks() + ", hasStaleStorages: " + node.hasStaleStorages() + ", processing time: " + (endTime - startTime) + " ms. " + reportStatistics);
            return !node.hasStaleStorages();
        }
        catch (Throwable t) {
            long endTime = Time.monotonicNow();
            blockLog.error("BLOCK* processReport fail: from " + nodeID + " storage: " + storage + ", blocks: " + newReport.getNumberOfBlocks() + ", processing time: " + (endTime - startTime) + " ms. " + reportStatistics, t);
            throw t;
        }
    }

    private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) throws IOException {
        LOG.warn("processReport 0x{}: removing zombie storage {}, which no longer exists on the DataNode.", (Object)Long.toHexString(context.getReportId()), (Object)zombie.getStorageID());
        int prevBlocks = zombie.numBlocks();
        this.removeBlocksAssociatedTo(zombie);
        assert (zombie.numBlocks() == 0);
        LOG.warn("processReport 0x{}: removed {} replicas from storage {}, which no longer exists on the DataNode.", new Object[]{Long.toHexString(context.getReportId()), prevBlocks, zombie.getStorageID()});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void rescanPostponedMisreplicatedBlocks() throws IOException {
        if (this.getPostponedMisreplicatedBlocksCount() == 0L) {
            return;
        }
        long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
        long startPostponedMisReplicatedBlocksCount = this.getPostponedMisreplicatedBlocksCount();
        try {
            int i = 0;
            long startIndex = 0L;
            long blocksPerRescan = this.datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
            long base = this.getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
            if (base > 0L && (startIndex = DFSUtil.getRandom().nextLong() % (base + 1L)) < 0L) {
                startIndex += base + 1L;
            }
            Iterator<Block> it = this.postponedMisreplicatedBlocks.iterator();
            int tmp = 0;
            while ((long)tmp < startIndex) {
                it.next();
                ++tmp;
            }
            HashSet toRemove = new HashSet();
            while (it.hasNext()) {
                Block b = it.next();
                if ((long)i >= blocksPerRescan) break;
                HopsTransactionalRequestHandler rescanPostponedMisreplicatedBlocksHandler = new HopsTransactionalRequestHandler(HDFSOperationType.RESCAN_MISREPLICATED_BLOCKS){
                    INodeIdentifier inodeIdentifier;

                    @Override
                    public void setUp() throws StorageException {
                        Block b = (Block)this.getParams()[0];
                        this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
                    }

                    public void acquireLock(TransactionLocks locks) throws IOException {
                        LockFactory lf = LockFactory.getInstance();
                        Block b = (Block)this.getParams()[0];
                        locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(b.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                    }

                    public Object performTask() throws IOException {
                        Block b = (Block)this.getParams()[0];
                        BlockInfoContiguous bi = BlockManager.this.blocksMap.getStoredBlock(b);
                        Set toRemoveSet = (Set)this.getParams()[1];
                        if (bi == null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: Postponed mis-replicated block " + b + " no longer found in block map.");
                            }
                            toRemoveSet.add(b);
                            BlockManager.this.postponedMisreplicatedBlocksCount.decrementAndGet();
                            return null;
                        }
                        MisReplicationResult res = BlockManager.this.processMisReplicatedBlock(bi);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: Re-scanned block " + b + ", result is " + (Object)((Object)res));
                        }
                        if (res != MisReplicationResult.POSTPONE) {
                            toRemoveSet.add(b);
                            BlockManager.this.postponedMisreplicatedBlocksCount.decrementAndGet();
                        }
                        return null;
                    }
                };
                rescanPostponedMisreplicatedBlocksHandler.setParams(new Object[]{b, toRemove});
                rescanPostponedMisreplicatedBlocksHandler.handle(this.namesystem);
                ++i;
            }
            this.postponedMisreplicatedBlocks.removeAll(toRemove);
        }
        finally {
            long endPostponedMisReplicatedBlocksCount = this.getPostponedMisreplicatedBlocksCount();
            LOG.info("Rescan of postponedMisreplicatedBlocks completed in " + (Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) + " msecs. " + endPostponedMisReplicatedBlocksCount + " blocks are left. " + (startPostponedMisReplicatedBlocksCount - endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
        }
    }

    public void markBlockReplicasAsCorrupt(BlockInfoContiguous block, long oldGenerationStamp, long oldNumBytes, DatanodeStorageInfo[] newStorages) throws IOException {
        BlockToMarkCorrupt b = null;
        if (block.getGenerationStamp() != oldGenerationStamp) {
            b = new BlockToMarkCorrupt(block, oldGenerationStamp, "genstamp does not match " + oldGenerationStamp + " : " + block.getGenerationStamp(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
        } else if (block.getNumBytes() != oldNumBytes) {
            b = new BlockToMarkCorrupt(block, "length does not match " + oldNumBytes + " : " + block.getNumBytes(), CorruptReplicasMap.Reason.SIZE_MISMATCH);
        } else {
            return;
        }
        for (DatanodeStorageInfo storage : this.getStorages(block)) {
            boolean isCorrupt = true;
            if (newStorages != null) {
                for (DatanodeStorageInfo newStorage : newStorages) {
                    if (newStorage == null || !storage.equals(newStorage)) continue;
                    isCorrupt = false;
                    break;
                }
            }
            if (!isCorrupt) continue;
            blockLog.info("BLOCK* markBlockReplicasAsCorrupt: mark block replica {} on {} as corrupt because the dn is not in the new committed storage list.", (Object)b, (Object)storage.getDatanodeDescriptor());
            this.markBlockAsCorrupt(b, storage, storage.getDatanodeDescriptor());
        }
    }

    @VisibleForTesting
    public ReportStatistics processReport(final DatanodeStorageInfo storage, BlockReport report) throws IOException {
        boolean firstBlockReport;
        ConcurrentHashMap mapToAdd = new ConcurrentHashMap();
        ConcurrentHashMap mapToRemove = new ConcurrentHashMap();
        ConcurrentHashMap mapToInvalidate = new ConcurrentHashMap();
        ConcurrentHashMap mapToCorrupt = new ConcurrentHashMap();
        ConcurrentHashMap mapToUC = new ConcurrentHashMap();
        Set<BlockInfoContiguous> toAdd = Collections.newSetFromMap(mapToAdd);
        Set<Long> toRemove = Collections.newSetFromMap(mapToRemove);
        Set<Block> toInvalidate = Collections.newSetFromMap(mapToInvalidate);
        Set<BlockToMarkCorrupt> toCorrupt = Collections.newSetFromMap(mapToCorrupt);
        Set<StatefulBlockInfo> toUC = Collections.newSetFromMap(mapToUC);
        boolean bl = firstBlockReport = this.namesystem.isInStartupSafeMode() || storage.getBlockReportCount() == 0;
        if (storage.getBlockReportCount() == 0) {
            HashBuckets.getInstance().createBucketsForStorage(storage);
        }
        ReportStatistics reportStatistics = this.reportDiff(storage, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC, firstBlockReport);
        for (StatefulBlockInfo b : toUC) {
            if (firstBlockReport) {
                this.addStoredBlockUnderConstructionImmediateTx(b.storedBlock, storage, b.reportedState);
                continue;
            }
            this.addStoredBlockUnderConstructionTx(b, storage);
        }
        ArrayList<Callable<Object>> addTasks = new ArrayList<Callable<Object>>();
        int numBlocksLogged = 0;
        HashMap<Long, ArrayList<BlockInfoContiguous>> blocksToAddPerInodeId = new HashMap<Long, ArrayList<BlockInfoContiguous>>();
        for (BlockInfoContiguous b : toAdd) {
            ArrayList<BlockInfoContiguous> blocksToAddList = (ArrayList<BlockInfoContiguous>)blocksToAddPerInodeId.get(b.getInodeId());
            if (blocksToAddList == null) {
                blocksToAddList = new ArrayList<BlockInfoContiguous>();
                blocksToAddPerInodeId.put(b.getInodeId(), blocksToAddList);
            }
            blocksToAddList.add(b);
        }
        final HashMap blocksToAdd = new HashMap();
        final HashMap blockIdsToAdd = new HashMap();
        final HashMap inodeIdsToAdd = new HashMap();
        int index = 0;
        for (List list : blocksToAddPerInodeId.values()) {
            ArrayList<BlockInfoContiguous> blocksToAddList = (ArrayList<BlockInfoContiguous>)blocksToAdd.get(index);
            ArrayList<Long> blockIdsToAddList = (ArrayList<Long>)blockIdsToAdd.get(index);
            ArrayList<Long> inodeIdsToAddList = (ArrayList<Long>)inodeIdsToAdd.get(index);
            if (blocksToAddList == null) {
                blocksToAddList = new ArrayList<BlockInfoContiguous>();
                blockIdsToAddList = new ArrayList<Long>();
                inodeIdsToAddList = new ArrayList<Long>();
                blocksToAdd.put(index, blocksToAddList);
                blockIdsToAdd.put(index, blockIdsToAddList);
                inodeIdsToAdd.put(index, inodeIdsToAddList);
            }
            for (BlockInfoContiguous b : list) {
                blocksToAddList.add(b);
                blockIdsToAddList.add(b.getBlockId());
                inodeIdsToAddList.add(b.getInodeId());
            }
            if (blocksToAddList.size() < this.slicerBatchSize) continue;
            ++index;
        }
        Iterator<Object> iterator = blocksToAdd.keySet().iterator();
        while (iterator.hasNext()) {
            boolean logIt;
            final int n = (Integer)iterator.next();
            if (firstBlockReport) {
                logIt = (long)numBlocksLogged < this.maxNumBlocksToLog;
                addTasks.add(new Callable<Object>(){

                    @Override
                    public Object call() throws Exception {
                        BlockManager.this.addStoredBlockImmediateTx((List)blocksToAdd.get(n), (List)blockIdsToAdd.get(n), (List)inodeIdsToAdd.get(n), storage, logIt);
                        return null;
                    }
                });
            } else {
                logIt = (long)numBlocksLogged < this.maxNumBlocksToLog;
                addTasks.add(new Callable<Object>(){

                    @Override
                    public Object call() throws Exception {
                        List l = (List)blocksToAdd.get(n);
                        List list = (List)blockIdsToAdd.get(n);
                        BlockManager.this.addStoredBlockTx((List)blocksToAdd.get(n), (List)blockIdsToAdd.get(n), (List)inodeIdsToAdd.get(n), storage, null, logIt);
                        return null;
                    }
                });
            }
            ++numBlocksLogged;
        }
        if ((long)numBlocksLogged > this.maxNumBlocksToLog) {
            blockLog.info("BLOCK* processReport: logged info for {} of {} reported.", (Object)this.maxNumBlocksToLog, (Object)numBlocksLogged);
        }
        try {
            List futures = ((FSNamesystem)this.namesystem).getFSOperationsExecutor().invokeAll(addTasks);
            for (Future maybeException : futures) {
                maybeException.get();
            }
        }
        catch (InterruptedException e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new IOException(e.getCause());
        }
        for (BlockToMarkCorrupt blockToMarkCorrupt : toCorrupt) {
            this.markBlockAsCorruptTx(blockToMarkCorrupt, storage);
        }
        for (Block block : toInvalidate) {
            blockLog.info("BLOCK* processReport: " + block + " on " + storage + " " + storage + " size " + block.getNumBytes() + " does not belong to any file");
        }
        this.addToInvalidates(toInvalidate, storage);
        this.removeBlocks(new ArrayList<Long>(toRemove), storage.getDatanodeDescriptor());
        return reportStatistics;
    }

    @VisibleForTesting
    public void removeBlocks(List<Long> allBlockIds, final DatanodeDescriptor node) throws IOException {
        final Map<Long, List<Long>> inodeIdsToBlockMap = INodeUtil.getINodeIdsForBlockIds(allBlockIds, this.slicerBatchSize, this.slicerNbThreads, ((FSNamesystem)this.namesystem).getFSOperationsExecutor());
        final ArrayList<Long> inodeIds = new ArrayList<Long>(inodeIdsToBlockMap.keySet());
        try {
            Slicer.slice((int)inodeIds.size(), (int)this.slicerBatchSize, (int)this.slicerNbThreads, (ExecutorService)((FSNamesystem)this.namesystem).getFSOperationsExecutor(), (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    List ids = inodeIds.subList(startIndex, endIndex);
                    BlockManager.this.removeStoredBlocksTx((List<Long>)ids, (Map<Long, List<Long>>)inodeIdsToBlockMap, node);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    public void removeBlocks(Map<Long, Long> allBlocksAndInodesIds, final DatanodeDescriptor node) throws IOException {
        final HashMap<Long, ArrayList<Long>> inodeIdsToBlockMap = new HashMap<Long, ArrayList<Long>>();
        for (Map.Entry<Long, Long> entry : allBlocksAndInodesIds.entrySet()) {
            ArrayList<Long> list = (ArrayList<Long>)inodeIdsToBlockMap.get(entry.getValue());
            if (list == null) {
                list = new ArrayList<Long>();
                inodeIdsToBlockMap.put(entry.getValue(), list);
            }
            list.add(entry.getKey());
        }
        final ArrayList inodeIds = new ArrayList(inodeIdsToBlockMap.keySet());
        try {
            Slicer.slice((int)inodeIds.size(), (int)this.slicerBatchSize, (int)this.slicerNbThreads, (ExecutorService)((FSNamesystem)this.namesystem).getFSOperationsExecutor(), (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    List ids = inodeIds.subList(startIndex, endIndex);
                    BlockManager.this.removeStoredBlocksTx((List<Long>)ids, (Map<Long, List<Long>>)inodeIdsToBlockMap, node);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    private void removeBlocks(Map<Long, Long> allBlocksAndInodesIds, final int sid) throws IOException {
        final HashMap<Long, ArrayList<Long>> inodeIdsToBlockMap = new HashMap<Long, ArrayList<Long>>();
        for (Map.Entry<Long, Long> entry : allBlocksAndInodesIds.entrySet()) {
            ArrayList<Long> list = (ArrayList<Long>)inodeIdsToBlockMap.get(entry.getValue());
            if (list == null) {
                list = new ArrayList<Long>();
                inodeIdsToBlockMap.put(entry.getValue(), list);
            }
            list.add(entry.getKey());
        }
        final ArrayList inodeIds = new ArrayList(inodeIdsToBlockMap.keySet());
        try {
            Slicer.slice((int)inodeIds.size(), (int)this.slicerBatchSize, (int)this.slicerNbThreads, (ExecutorService)((FSNamesystem)this.namesystem).getFSOperationsExecutor(), (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    List ids = inodeIds.subList(startIndex, endIndex);
                    BlockManager.this.removeStoredBlocksTx((List<Long>)ids, (Map<Long, List<Long>>)inodeIdsToBlockMap, sid);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    Map<Long, Long> replicasInBucketsMT(final DatanodeStorageInfo storage, List<Integer> mismatchedBuckets) throws IOException {
        ConcurrentHashMap<Long, Long> mismatchedBlocksAndInodes = new ConcurrentHashMap<Long, Long>();
        ArrayList<11> subTasks = new ArrayList<11>();
        for (final Integer bucket : mismatchedBuckets) {
            Callable<Map<Long, Long>> subTask = new Callable<Map<Long, Long>>(){

                @Override
                public Map<Long, Long> call() throws IOException {
                    ArrayList<Integer> buckets = new ArrayList<Integer>();
                    buckets.add(bucket);
                    Map<Long, Long> mismatchedBlocksAndInodes = storage.getAllStorageReplicasInBuckets(buckets);
                    return mismatchedBlocksAndInodes;
                }
            };
            subTasks.add(subTask);
        }
        try {
            List futures = ((FSNamesystem)this.namesystem).getFSOperationsExecutor().invokeAll(subTasks);
            for (Future maybeException : futures) {
                mismatchedBlocksAndInodes.putAll((Map)maybeException.get());
            }
        }
        catch (InterruptedException e) {
            LOG.error("Exception was thrown during block report processing", (Throwable)e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            throw (IOException)e.getCause();
        }
        return mismatchedBlocksAndInodes;
    }

    private ReportStatistics reportDiff(DatanodeStorageInfo storage, BlockReport newReport, Collection<BlockInfoContiguous> toAdd, Collection<Long> toRemove, Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, Collection<StatefulBlockInfo> toUC, boolean firstBlockReport) throws IOException {
        if (newReport == null) {
            return null;
        }
        Map<Long, Long> invalidatedReplicas = storage.getAllStorageInvalidatedReplicasWithGenStamp();
        ReportStatistics stats = new ReportStatistics();
        stats.numBuckets = newReport.getBuckets().length;
        stats.numBlocks = newReport.getNumberOfBlocks();
        List<Integer> mismatchedBuckets = this.getReportedBucketList(newReport);
        stats.numBucketsMatching = newReport.getBuckets().length - mismatchedBuckets.size();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("%d/%d reported hashes matched", newReport.getBuckets().length - mismatchedBuckets.size(), newReport.getBuckets().length));
        }
        HashSet<Long> aggregatedSafeBlocks = new HashSet<Long>();
        Map<Long, Long> mismatchedBlocksAndInodes = this.replicasInBucketsMT(storage, mismatchedBuckets);
        aggregatedSafeBlocks.addAll(mismatchedBlocksAndInodes.keySet());
        this.processMisMatchingBuckets(storage, newReport, mismatchedBuckets, toAdd, toInvalidate, toCorrupt, toUC, firstBlockReport, mismatchedBlocksAndInodes, aggregatedSafeBlocks, invalidatedReplicas);
        stats.numToAdd = toAdd.size();
        stats.numToInvalidate = toInvalidate.size();
        stats.numToCorrupt = toCorrupt.size();
        stats.numToUC = toUC.size();
        toRemove.addAll(mismatchedBlocksAndInodes.keySet());
        stats.numToRemove = toRemove.size();
        if (this.namesystem.isInStartupSafeMode()) {
            aggregatedSafeBlocks.removeAll(toRemove);
            LOG.debug("AGGREGATED SAFE BLOCK #: " + aggregatedSafeBlocks.size() + " REPORTED BLOCK #: " + newReport.getNumberOfBlocks());
            this.namesystem.adjustSafeModeBlocks(aggregatedSafeBlocks);
            stats.numConsideredSafeIfInSafemode = aggregatedSafeBlocks.size();
        }
        return stats;
    }

    private void processMisMatchingBuckets(final DatanodeStorageInfo storage, BlockReport newReport, List<Integer> mismatchedBuckets, final Collection<BlockInfoContiguous> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC, final boolean firstBlockReport, final Map<Long, Long> mismatchedBlocksAndInodes, final Set<Long> aggregatedSafeBlocks, final Map<Long, Long> invalidatedReplicas) throws IOException {
        ArrayList<12> subTasks = new ArrayList<12>();
        for (final int bucketId : mismatchedBuckets) {
            Bucket bucket = newReport.getBuckets()[bucketId];
            final BlockListAsLongs bucketBlocks = bucket.getBlocks();
            Callable<Void> subTask = new Callable<Void>(){

                @Override
                public Void call() throws IOException {
                    HopsTransactionalRequestHandler processReportHandler = BlockManager.this.processBucketInternal(storage, bucketId, toAdd, toInvalidate, toCorrupt, toUC, firstBlockReport, mismatchedBlocksAndInodes, aggregatedSafeBlocks, invalidatedReplicas, bucketBlocks);
                    processReportHandler.handle();
                    return null;
                }
            };
            subTasks.add(subTask);
        }
        try {
            List futures = ((FSNamesystem)this.namesystem).getFSOperationsExecutor().invokeAll(subTasks);
            for (Future maybeException : futures) {
                maybeException.get();
            }
        }
        catch (InterruptedException e) {
            LOG.error("Exception was thrown during block report processing", (Throwable)e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            throw (IOException)e.getCause();
        }
    }

    private HopsTransactionalRequestHandler processBucketInternal(final DatanodeStorageInfo storage, final int bucketId, final Collection<BlockInfoContiguous> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC, final boolean firstBlockReport, final Map<Long, Long> mismatchedBlocksAndInodes, final Set<Long> aggregatedSafeBlocks, final Map<Long, Long> invalidatedReplicas, final BlockListAsLongs reportedBlocks) {
        return new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_REPORT){

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                if (reportedBlocks.getNumberOfBlocks() != 0) {
                    ArrayList<Long> resolvedBlockIds = new ArrayList<Long>();
                    ArrayList<Long> inodeIds = new ArrayList<Long>();
                    ArrayList<Long> unResolvedBlockIds = new ArrayList<Long>();
                    for (BlockListAsLongs.BlockReportReplica reportedBlock : reportedBlocks) {
                        Long inodeId = (Long)mismatchedBlocksAndInodes.get(reportedBlock.getBlockId());
                        if (inodeId != null) {
                            resolvedBlockIds.add(reportedBlock.getBlockId());
                            inodeIds.add(inodeId);
                            continue;
                        }
                        unResolvedBlockIds.add(reportedBlock.getBlockId());
                    }
                    locks.add(lf.getBlockReportingLocks(Longs.toArray(resolvedBlockIds), Longs.toArray(inodeIds), Longs.toArray(unResolvedBlockIds), storage.getSid()));
                }
                locks.add(lf.getIndividualHashBucketLock(storage.getSid(), bucketId));
            }

            public Object performTask() throws IOException {
                byte[] hash = HashBuckets.initalizeHash();
                for (BlockListAsLongs.BlockReportReplica brb : reportedBlocks) {
                    Block block = new Block();
                    block.setNoPersistance(brb.getBlockId(), brb.getBytesOnDisk(), brb.getGenerationStamp());
                    BlockInfoContiguous storedBlock = BlockManager.this.processReportedBlock(storage, block, brb.getState(), toAdd, toInvalidate, toCorrupt, toUC, aggregatedSafeBlocks, firstBlockReport, mismatchedBlocksAndInodes.containsKey(brb.getBlockId()), invalidatedReplicas);
                    if (storedBlock == null) continue;
                    mismatchedBlocksAndInodes.remove(storedBlock.getBlockId());
                    if (brb.getState() != HdfsServerConstants.ReplicaState.FINALIZED) continue;
                    HashBuckets.XORHashes(hash, BlockReport.hashAsFinalized(storedBlock));
                }
                HashBucket bucket = HashBuckets.getInstance().getBucket(storage.getSid(), bucketId);
                bucket.setHash(hash);
                return null;
            }
        };
    }

    private List<Integer> getReportedBucketList(BlockReport report) throws IOException {
        ArrayList<Integer> missMatchingBuckets = new ArrayList<Integer>();
        for (int i = 0; i < report.getBuckets().length; ++i) {
            Bucket b = report.getBuckets()[i];
            if (b.isSkip()) continue;
            missMatchingBuckets.add(i);
        }
        return missMatchingBuckets;
    }

    private HashMatchingResult calculateMismatchedHashes(DatanodeStorageInfo storage, BlockReport report, Boolean firstBlockReport) throws IOException {
        List<HashBucket> storedHashes = HashBuckets.getInstance().getBucketsForStorage(storage);
        HashMap<Integer, HashBucket> storedHashesMap = new HashMap<Integer, HashBucket>();
        for (HashBucket allStorageHash : storedHashes) {
            storedHashesMap.put(allStorageHash.getBucketId(), allStorageHash);
        }
        ArrayList<Integer> matchedBuckets = new ArrayList<Integer>();
        ArrayList<Integer> mismatchedBuckets = new ArrayList<Integer>();
        for (int i = 0; i < report.getBuckets().length; ++i) {
            if (!storedHashesMap.containsKey(i)) {
                mismatchedBuckets.add(i);
                continue;
            }
            byte[] storedHash = ((HashBucket)storedHashesMap.get(i)).getHash();
            if (firstBlockReport.booleanValue()) {
                mismatchedBuckets.add(i);
                continue;
            }
            byte[] reportedHash = report.getBuckets()[i].getHash();
            if (HashBuckets.hashEquals(storedHash, reportedHash)) {
                matchedBuckets.add(i);
                continue;
            }
            mismatchedBuckets.add(i);
        }
        assert (matchedBuckets.size() + mismatchedBuckets.size() == report.getBuckets().length);
        return new HashMatchingResult(matchedBuckets, mismatchedBuckets);
    }

    private BlockInfoContiguous processIncrementallyReportedBlock(DatanodeStorageInfo storageInfo, Block block, HdfsServerConstants.ReplicaState reportedState, Collection<BlockInfoContiguous> toAdd, Collection<BlockInfoContiguous> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, Collection<StatefulBlockInfo> toUC) throws IOException {
        BlockInfoContiguous storedBlock;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reported block " + block + " on " + storageInfo.getStorageID() + " size " + block.getNumBytes() + " replicaState = " + (Object)((Object)reportedState));
        }
        if ((storedBlock = this.blocksMap.getStoredBlock(block)) == null) {
            blockLog.info("BLOCK* processReport: " + block + " on " + storageInfo + " size " + block.getNumBytes() + " does not belong to any file");
            storedBlock = new BlockInfoContiguous();
            storedBlock.setINodeIdNoPersistance(BlockInfoContiguous.NON_EXISTING_ID);
            storedBlock.setNoPersistance(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp());
            toInvalidate.add(storedBlock);
            return null;
        }
        HdfsServerConstants.BlockUCState ucState = storedBlock.getBlockUCState();
        if (LOG.isDebugEnabled()) {
            LOG.debug("In memory blockUCState = " + (Object)((Object)ucState) + " bid=" + storedBlock.getBlockIndex());
        }
        if (this.invalidateBlocks.contains(storageInfo, storedBlock)) {
            return storedBlock;
        }
        BlockToMarkCorrupt c = this.checkReplicaCorrupt(block, reportedState, storedBlock, ucState, storageInfo);
        if (c != null) {
            toCorrupt.add(c);
            return storedBlock;
        }
        if (this.isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
            toUC.add(new StatefulBlockInfo((BlockInfoContiguousUnderConstruction)storedBlock, block, reportedState));
            return storedBlock;
        }
        if (reportedState == HdfsServerConstants.ReplicaState.FINALIZED && (!storedBlock.isReplicatedOnStorage(storageInfo) || this.corruptReplicas.isReplicaCorrupt(storedBlock, storageInfo.getDatanodeDescriptor()))) {
            toAdd.add(storedBlock);
        }
        return storedBlock;
    }

    private BlockInfoContiguous processReportedBlock(DatanodeStorageInfo storageInfo, Block block, HdfsServerConstants.ReplicaState reportedState, Collection<BlockInfoContiguous> toAdd, Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, Collection<StatefulBlockInfo> toUC, Set<Long> safeBlocks, boolean firstBlockReport, boolean replicaAlreadyExists, Map<Long, Long> allMachineInvalidatedBlocks) throws IOException {
        BlockInfoContiguous storedBlock;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reported block " + block + " on " + storageInfo.getStorageID() + " size " + block.getNumBytes() + " replicaState = " + (Object)((Object)reportedState));
        }
        if ((storedBlock = this.blocksMap.getStoredBlock(block)) == null) {
            blockLog.info("BLOCK* processReport: " + block + " on " + storageInfo.getStorageID() + " size " + block.getNumBytes() + " does not belong to any file");
            toInvalidate.add(new Block(block));
            safeBlocks.remove(block.getBlockId());
            return null;
        }
        HdfsServerConstants.BlockUCState ucState = storedBlock.getBlockUCState();
        if (LOG.isDebugEnabled()) {
            LOG.debug("In memory blockUCState = " + (Object)((Object)ucState) + " bid=" + storedBlock.getBlockIndex());
        }
        if (!firstBlockReport && allMachineInvalidatedBlocks.containsKey(block.getBlockId()) && allMachineInvalidatedBlocks.get(block.getBlockId()).longValue() == block.getGenerationStamp()) {
            return storedBlock;
        }
        BlockToMarkCorrupt c = this.checkReplicaCorrupt(block, reportedState, storedBlock, ucState, storageInfo);
        if (c != null) {
            toCorrupt.add(c);
            safeBlocks.remove(block.getBlockId());
            return storedBlock;
        }
        if (this.isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
            toUC.add(new StatefulBlockInfo((BlockInfoContiguousUnderConstruction)storedBlock, block, reportedState));
            safeBlocks.remove(block.getBlockId());
            return storedBlock;
        }
        if (reportedState == HdfsServerConstants.ReplicaState.FINALIZED) {
            if (replicaAlreadyExists || storedBlock.isReplicatedOnStorage(storageInfo)) {
                return storedBlock;
            }
            toAdd.add(storedBlock);
            safeBlocks.remove(block.getBlockId());
        }
        return storedBlock;
    }

    private BlockToMarkCorrupt checkReplicaCorrupt(Block reported, HdfsServerConstants.ReplicaState reportedState, BlockInfoContiguous storedBlock, HdfsServerConstants.BlockUCState ucState, DatanodeStorageInfo storage) {
        switch (reportedState) {
            case FINALIZED: {
                switch (ucState) {
                    case COMPLETE: 
                    case COMMITTED: {
                        if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
                            long reportedGS = reported.getGenerationStamp();
                            return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + (Object)((Object)ucState) + " and reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
                        }
                        if (storedBlock.getNumBytes() != reported.getNumBytes()) {
                            return new BlockToMarkCorrupt(storedBlock, "block is " + (Object)((Object)ucState) + " and reported length " + reported.getNumBytes() + " does not match length in block map " + storedBlock.getNumBytes(), CorruptReplicasMap.Reason.SIZE_MISMATCH);
                        }
                        return null;
                    }
                    case UNDER_CONSTRUCTION: {
                        if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
                            long reportedGS = reported.getGenerationStamp();
                            return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + (Object)((Object)ucState) + " and reported state " + (Object)((Object)reportedState) + ", But reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
                        }
                        return null;
                    }
                }
                return null;
            }
            case RBW: 
            case RWR: {
                if (!storedBlock.isComplete()) {
                    return null;
                }
                if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
                    long reportedGS = reported.getGenerationStamp();
                    return new BlockToMarkCorrupt(storedBlock, reportedGS, "reported " + (Object)((Object)reportedState) + " replica with genstamp " + reportedGS + " does not match COMPLETE block's genstamp in block map " + storedBlock.getGenerationStamp(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
                }
                if (reportedState == HdfsServerConstants.ReplicaState.RBW) {
                    LOG.info("Received an RBW replica for " + storedBlock + " on " + storage + ": ignoring it, since it is complete with the same genstamp");
                    return null;
                }
                return new BlockToMarkCorrupt(storedBlock, "reported replica has invalid state " + (Object)((Object)reportedState), CorruptReplicasMap.Reason.INVALID_STATE);
            }
        }
        String msg = "Unexpected replica state " + (Object)((Object)reportedState) + " for block: " + storedBlock + " on " + storage + " size " + storedBlock.getNumBytes();
        LOG.warn(msg);
        return new BlockToMarkCorrupt(storedBlock, msg, CorruptReplicasMap.Reason.INVALID_STATE);
    }

    private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock, HdfsServerConstants.BlockUCState ucState, HdfsServerConstants.ReplicaState reportedState) {
        switch (reportedState) {
            case FINALIZED: {
                switch (ucState) {
                    case UNDER_CONSTRUCTION: 
                    case UNDER_RECOVERY: {
                        return true;
                    }
                }
                return false;
            }
            case RBW: 
            case RWR: {
                return !storedBlock.isComplete();
            }
        }
        return false;
    }

    void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, DatanodeStorageInfo storageInfo) throws IOException {
        BlockInfoContiguousUnderConstruction block = (BlockInfoContiguousUnderConstruction)this.blocksMap.getStoredBlock(ucBlock.storedBlock);
        if (block == null) {
            LOG.debug("block " + ucBlock.storedBlock.getBlockId() + " does not exist anymore");
            return;
        }
        block.addReplicaIfNotPresent(storageInfo, ucBlock.reportedState, ucBlock.reportedBlock.getGenerationStamp());
        if (ucBlock.reportedState == HdfsServerConstants.ReplicaState.FINALIZED && !block.isReplicatedOnStorage(storageInfo)) {
            this.addStoredBlock(block, storageInfo, null, true);
        }
    }

    private void addStoredBlockImmediate(BlockInfoContiguous storedBlock, DatanodeStorageInfo storage, boolean logEveryBlock) throws IOException {
        assert (storedBlock != null);
        if (!this.namesystem.isInStartupSafeMode() || this.namesystem.isPopulatingReplQueues()) {
            this.addStoredBlock(storedBlock, storage, null, logEveryBlock);
            return;
        }
        storage.addBlock(storedBlock);
        int numCurrentReplica = this.countLiveNodes(storedBlock);
        if (storedBlock.getBlockUCState() == HdfsServerConstants.BlockUCState.COMMITTED && numCurrentReplica >= this.minReplication) {
            this.completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
        } else if (storedBlock.isComplete()) {
            this.namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
        }
    }

    private Block addStoredBlock(BlockInfoContiguous block, DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException {
        INode iNode;
        int curReplicaDelta;
        DatanodeStorageInfo.AddBlockResult result;
        assert (block != null);
        BlockInfoContiguous storedBlock = this.blocksMap.getStoredBlock(block);
        if (storedBlock == null || storedBlock.isDeleted()) {
            blockLog.info("BLOCK* addStoredBlock: {} on {} size {} but it does not belong to any file", new Object[]{block, storageInfo.getStorageID(), block.getNumBytes()});
            return block;
        }
        BlockCollection bc = storedBlock.getBlockCollection();
        assert (bc != null) : "Block must belong to a file";
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        NumberReplicas numBeforeAdding = null;
        if (fsNamesystem.isErasureCodingEnabled()) {
            numBeforeAdding = this.countNodes(block);
        }
        if ((result = storageInfo.addBlock(storedBlock)) == DatanodeStorageInfo.AddBlockResult.ADDED) {
            curReplicaDelta = 1;
            if (logEveryBlock) {
                this.logAddStoredBlock(storedBlock, storageInfo);
            }
        } else if (result == DatanodeStorageInfo.AddBlockResult.REPLACED) {
            curReplicaDelta = 0;
            blockLog.warn("BLOCK* addStoredBlock: block {} moved to storageType {} on storage {}", new Object[]{storedBlock, storageInfo.getStorageType(), storageInfo.getStorageID()});
        } else {
            this.corruptReplicas.removeFromCorruptReplicasMap(block, storageInfo.getDatanodeDescriptor(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
            curReplicaDelta = 0;
            blockLog.warn("BLOCK* addStoredBlock: Redundant addStoredBlock request received for {} on node {} size {}", new Object[]{storedBlock, storageInfo.getStorageID(), storedBlock.getNumBytes()});
        }
        NumberReplicas num = this.countNodes(storedBlock);
        int numLiveReplicas = num.liveReplicas();
        int numCurrentReplica = numLiveReplicas + this.pendingReplications.getNumReplicas(storedBlock);
        if (storedBlock.getBlockUCState() == HdfsServerConstants.BlockUCState.COMMITTED && numLiveReplicas >= this.minReplication) {
            storedBlock = this.completeBlock(bc, storedBlock, false);
        } else if (storedBlock.isComplete() && result == DatanodeStorageInfo.AddBlockResult.ADDED) {
            this.namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
        }
        if (bc.isUnderConstruction()) {
            return storedBlock;
        }
        if (!this.namesystem.isPopulatingReplQueues()) {
            return storedBlock;
        }
        short fileReplication = bc.getBlockReplication();
        if (!this.isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
            this.neededReplications.remove(storedBlock, numCurrentReplica, num.decommissionedAndDecommissioning(), fileReplication);
        } else {
            this.updateNeededReplications(storedBlock, curReplicaDelta, 0);
        }
        if (numCurrentReplica > fileReplication) {
            this.processOverReplicatedBlock(storedBlock, fileReplication, storageInfo.getDatanodeDescriptor(), delNodeHint);
        }
        int corruptReplicasCount = this.corruptReplicas.numCorruptReplicas(storedBlock);
        int numCorruptNodes = num.corruptReplicas();
        if (numCorruptNodes != corruptReplicasCount) {
            LOG.warn("Inconsistent number of corrupt replicas for " + storedBlock + "blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount);
        }
        if (corruptReplicasCount > 0 && numLiveReplicas >= fileReplication) {
            this.invalidateCorruptReplicas(storedBlock);
        }
        if (fsNamesystem.isErasureCodingEnabled() && !(iNode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{bc.getId()})).isUnderConstruction() && numBeforeAdding.liveReplicas() == 0 && numLiveReplicas > 0) {
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{bc.getId()});
            if (status != null && status.isCorrupt()) {
                int lostBlockCount = status.getLostBlocks() - 1;
                status.setLostBlocks(Integer.valueOf(lostBlockCount));
                if (lostBlockCount == 0) {
                    status.setStatus(EncodingStatus.Status.ENCODED);
                    status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                }
                EntityManager.update((Object)status);
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{bc.getId()});
                if (status == null) {
                    LOG.info("addStoredBlock returned null for " + bc.getId());
                } else {
                    LOG.info("addStoredBlock found " + bc.getId() + " with status " + status);
                }
                if (status != null && status.isParityCorrupt()) {
                    int lostParityBlockCount = status.getLostParityBlocks() - 1;
                    status.setLostParityBlocks(Integer.valueOf(lostParityBlockCount));
                    if (lostParityBlockCount == 0) {
                        status.setParityStatus(EncodingStatus.ParityStatus.HEALTHY);
                        status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    EntityManager.update((Object)status);
                    LOG.info("addStoredBlock found set status to potentially fixed");
                }
            }
        }
        return storedBlock;
    }

    private void logAddStoredBlock(BlockInfoContiguous storedBlock, DatanodeStorageInfo storage) {
        if (!blockLog.isInfoEnabled()) {
            return;
        }
        StringBuilder sb = new StringBuilder(500);
        sb.append("BLOCK* addStoredBlock: blockMap updated: ").append(storage).append(" is added to ").append(storedBlock).append(" size ").append(storedBlock.getNumBytes()).append(" byte");
        blockLog.info(sb.toString());
    }

    private void invalidateCorruptReplicas(BlockInfoContiguous blk) throws StorageException, TransactionContextException {
        DatanodeDescriptor[] nodesCopy;
        Collection<DatanodeDescriptor> nodes = this.corruptReplicas.getNodes(blk);
        boolean removedFromBlocksMap = true;
        if (nodes == null) {
            return;
        }
        for (DatanodeDescriptor node : nodesCopy = nodes.toArray(new DatanodeDescriptor[0])) {
            try {
                removedFromBlocksMap = this.invalidateBlock(new BlockToMarkCorrupt(blk, null, CorruptReplicasMap.Reason.ANY), node);
            }
            catch (IOException e) {
                blockLog.info("invalidateCorruptReplicas error in deleting bad block {} on {}", new Object[]{blk, node, e});
                removedFromBlocksMap = false;
            }
        }
        if (removedFromBlocksMap) {
            this.corruptReplicas.removeFromCorruptReplicasMap(blk);
        }
    }

    public synchronized void processMisReplicatedBlocks() throws IOException {
        this.stopReplicationInitializer();
        if (this.namesystem.isLeader()) {
            HdfsVariables.resetMisReplicatedIndex();
            this.neededReplications.clear();
            this.excessReplicateMap.clear();
        }
        this.replicationQueuesInitializer = new Daemon(){

            public void run() {
                try {
                    BlockManager.this.processMisReplicatesAsync();
                }
                catch (InterruptedException ie) {
                    LOG.info("Interrupted while processing replication queues.");
                }
                catch (Exception e) {
                    LOG.error("Error while processing replication queues async", (Throwable)e);
                }
            }
        };
        this.replicationQueuesInitializer.setName("Replication Queue Initializer");
        this.replicationQueuesInitializer.start();
    }

    private void stopReplicationInitializer() {
        if (this.replicationQueuesInitializer != null) {
            this.replicationQueuesInitializer.interrupt();
            try {
                this.replicationQueuesInitializer.join();
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
                return;
            }
            finally {
                this.replicationQueuesInitializer = null;
            }
        }
    }

    private List<MisReplicatedRange> checkMisReplicatedRangeQueue() throws IOException {
        LightWeightRequestHandler cleanMisReplicatedRangeQueueHandler = new LightWeightRequestHandler(HDFSOperationType.UPDATE_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                HdfsStorageFactory.getConnector().writeLock();
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                List ranges = da.getAll();
                HashSet<Long> activeNodes = new HashSet<Long>();
                for (ActiveNode nn : BlockManager.this.namesystem.getNameNode().getLeaderElectionInstance().getActiveNamenodes().getActiveNodes()) {
                    activeNodes.add(nn.getId());
                }
                ArrayList<MisReplicatedRange> toRemove = new ArrayList<MisReplicatedRange>();
                for (MisReplicatedRange range : ranges) {
                    if (activeNodes.contains(range.getNnId())) continue;
                    toRemove.add(range);
                }
                da.remove(toRemove);
                return toRemove;
            }
        };
        ArrayList<MisReplicatedRange> toProcess = new ArrayList<MisReplicatedRange>();
        while (this.namesystem.isLeader() && this.sizeOfMisReplicatedRangeQueue() > 0) {
            toProcess.addAll((List)cleanMisReplicatedRangeQueueHandler.handle());
        }
        return toProcess;
    }

    private void processMisReplicatesAsync() throws InterruptedException, IOException {
        AtomicLong nrInvalid = new AtomicLong(0L);
        AtomicLong nrOverReplicated = new AtomicLong(0L);
        AtomicLong nrUnderReplicated = new AtomicLong(0L);
        AtomicLong nrPostponed = new AtomicLong(0L);
        AtomicLong nrUnderConstruction = new AtomicLong(0L);
        long startTimeMisReplicatedScan = Time.monotonicNow();
        long totalBlocks = this.blocksMap.size();
        this.replicationQueuesInitProgress = 0.0;
        AtomicLong totalProcessed = new AtomicLong(0L);
        int filesToProcess = this.slicerBatchSize * this.processMisReplicatedNoOfBatchs;
        this.addToMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), -1L));
        long maxInodeId = 0L;
        if (LOG.isInfoEnabled()) {
            maxInodeId = this.blocksMap.getMaxInodeId();
        }
        while (this.namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
            boolean haveMore;
            long filesToProcessEndIndex;
            long filesToProcessStartIndex;
            do {
                filesToProcessEndIndex = HdfsVariables.incrementMisReplicatedIndex(filesToProcess);
                filesToProcessStartIndex = filesToProcessEndIndex - (long)filesToProcess;
                haveMore = this.blocksMap.haveFilesWithIdGreaterThan(filesToProcessEndIndex);
            } while (!this.blocksMap.haveFilesWithIdBetween(filesToProcessStartIndex, filesToProcessEndIndex) && haveMore);
            this.addToMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), filesToProcessStartIndex));
            this.processMissreplicatedInt(filesToProcessStartIndex, filesToProcessEndIndex, filesToProcess, nrInvalid, nrOverReplicated, nrUnderReplicated, nrPostponed, nrUnderConstruction, totalProcessed, maxInodeId);
            this.addToMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), -1L));
            this.replicationQueuesInitProgress = Math.min((double)totalProcessed.get() / (double)totalBlocks, 1.0);
            if (haveMore) continue;
            this.removeFromMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), -1L));
            if (this.namesystem.isLeader()) {
                List<MisReplicatedRange> toProcess = this.checkMisReplicatedRangeQueue();
                for (MisReplicatedRange range : toProcess) {
                    long startIndex = range.getStartIndex();
                    if (startIndex <= 0L) continue;
                    this.processMissreplicatedInt(startIndex, startIndex + (long)filesToProcess, filesToProcess, nrInvalid, nrOverReplicated, nrUnderReplicated, nrPostponed, nrUnderConstruction, totalProcessed, maxInodeId);
                }
            }
            LOG.info("Total number of blocks            = " + this.blocksMap.size());
            LOG.info("Number of invalid blocks          = " + nrInvalid.get());
            LOG.info("Number of under-replicated blocks = " + nrUnderReplicated.get());
            LOG.info("Number of  over-replicated blocks = " + nrOverReplicated.get() + (nrPostponed.get() > 0L ? " (" + nrPostponed.get() + " postponed)" : ""));
            LOG.info("Number of blocks being written    = " + nrUnderConstruction.get());
            NameNode.stateChangeLog.info("STATE* Replication Queue initialization scan for invalid, over- and under-replicated blocks completed in " + (Time.monotonicNow() - startTimeMisReplicatedScan) + " msec");
            break;
        }
        if (Thread.currentThread().isInterrupted()) {
            LOG.info("Interrupted while processing replication queues.");
        }
    }

    private void processMissreplicatedInt(long filesToProcessStartIndex, long filesToProcessEndIndex, int filesToProcess, final AtomicLong nrInvalid, final AtomicLong nrOverReplicated, final AtomicLong nrUnderReplicated, final AtomicLong nrPostponed, final AtomicLong nrUnderConstruction, final AtomicLong totalProcessed, long maxInodeId) throws IOException {
        final List<INodeIdentifier> allINodes = this.blocksMap.getAllINodeFiles(filesToProcessStartIndex, filesToProcessEndIndex);
        LOG.info("processMisReplicated read  " + allINodes.size() + "/" + filesToProcess + " in the Ids range [" + filesToProcessStartIndex + " - " + filesToProcessEndIndex + "] (max inodeId when the process started: " + maxInodeId + ")");
        try {
            Slicer.slice((int)allINodes.size(), (int)this.slicerBatchSize, (int)this.slicerNbThreads, (ExecutorService)((FSNamesystem)this.namesystem).getFSOperationsExecutor(), (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    final List inodeIdentifiers = allINodes.subList(startIndex, endIndex);
                    HopsTransactionalRequestHandler processMisReplicatedBlocksHandler = new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_MIS_REPLICATED_BLOCKS_PER_INODE_BATCH){

                        public void acquireLock(TransactionLocks locks) throws IOException {
                            LockFactory lf = LockFactory.getInstance();
                            locks.add(lf.getMultipleINodesLock(inodeIdentifiers, TransactionLockTypes.INodeLockType.WRITE)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                        }

                        public Object performTask() throws IOException {
                            for (INodeIdentifier inodeIdentifier : inodeIdentifiers) {
                                INode inode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{inodeIdentifier.getInodeId()});
                                if (inode == null) {
                                    LOG.info("Process misreplicated blocks File with ID: " + inodeIdentifier.getInodeId() + " not found. File is overritten or deleted");
                                    continue;
                                }
                                if (inode instanceof INodeSymlink) continue;
                                for (BlockInfoContiguous block : ((INodeFile)inode).getBlocks()) {
                                    MisReplicationResult res = BlockManager.this.processMisReplicatedBlock(block);
                                    if (LOG.isTraceEnabled()) {
                                        LOG.trace("block " + block + ": " + (Object)((Object)res));
                                    }
                                    switch (res) {
                                        case UNDER_REPLICATED: {
                                            nrUnderReplicated.incrementAndGet();
                                            break;
                                        }
                                        case OVER_REPLICATED: {
                                            nrOverReplicated.incrementAndGet();
                                            break;
                                        }
                                        case INVALID: {
                                            nrInvalid.incrementAndGet();
                                            break;
                                        }
                                        case POSTPONE: {
                                            nrPostponed.incrementAndGet();
                                            BlockManager.this.postponeBlock(block);
                                            break;
                                        }
                                        case UNDER_CONSTRUCTION: {
                                            nrUnderConstruction.incrementAndGet();
                                            break;
                                        }
                                        case OK: {
                                            break;
                                        }
                                        default: {
                                            throw new AssertionError((Object)("Invalid enum value: " + (Object)((Object)res)));
                                        }
                                    }
                                    totalProcessed.incrementAndGet();
                                }
                            }
                            return null;
                        }
                    };
                    processMisReplicatedBlocksHandler.handle(BlockManager.this.namesystem);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    public double getReplicationQueuesInitProgress() {
        return this.replicationQueuesInitProgress;
    }

    private void addToMisReplicatedRangeQueue(final MisReplicatedRange range) throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.UPDATE_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                HdfsStorageFactory.getConnector().writeLock();
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                da.insert(range);
                return null;
            }
        }.handle();
    }

    private void removeFromMisReplicatedRangeQueue(final MisReplicatedRange range) throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.UPDATE_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                HdfsStorageFactory.getConnector().writeLock();
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                da.remove(range);
                return null;
            }
        }.handle();
    }

    private int sizeOfMisReplicatedRangeQueue() throws IOException {
        return (Integer)new LightWeightRequestHandler(HDFSOperationType.COUNT_ALL_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                return da.countAll();
            }
        }.handle();
    }

    private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) throws IOException {
        NumberReplicas num;
        int numCurrentReplica;
        if (block.isDeleted()) {
            this.addToInvalidates(block);
            return MisReplicationResult.INVALID;
        }
        if (!block.isComplete()) {
            return MisReplicationResult.UNDER_CONSTRUCTION;
        }
        short expectedReplication = block.getBlockCollection().getBlockReplication();
        if (this.isNeededReplication(block, expectedReplication, numCurrentReplica = (num = this.countNodes(block)).liveReplicas()) && this.neededReplications.add(block, numCurrentReplica, num.decommissionedAndDecommissioning(), expectedReplication)) {
            return MisReplicationResult.UNDER_REPLICATED;
        }
        if (numCurrentReplica > expectedReplication) {
            if (num.replicasOnStaleNodes() > 0) {
                return MisReplicationResult.POSTPONE;
            }
            this.processOverReplicatedBlock(block, expectedReplication, null, null);
            return MisReplicationResult.OVER_REPLICATED;
        }
        return MisReplicationResult.OK;
    }

    public void setReplication(short oldRepl, short newRepl, String src, BlockInfoContiguous ... blocks) throws IOException {
        if (newRepl == oldRepl) {
            return;
        }
        for (BlockInfoContiguous b : blocks) {
            this.updateNeededReplications(b, 0, newRepl - oldRepl);
        }
        if (oldRepl > newRepl) {
            LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl + " for " + src);
            for (BlockInfoContiguous b : blocks) {
                this.processOverReplicatedBlock(b, newRepl, null, null);
            }
        } else {
            LOG.info("Increasing replication from " + oldRepl + " to " + newRepl + " for " + src);
        }
    }

    private void processOverReplicatedBlock(BlockInfoContiguous block, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) throws IOException {
        if (addedNode == delNodeHint) {
            delNodeHint = null;
        }
        ArrayList<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
        Collection<DatanodeDescriptor> corruptNodes = this.corruptReplicas.getNodes(block);
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(block, DatanodeStorage.State.NORMAL)) {
            DatanodeDescriptor cur = storage.getDatanodeDescriptor();
            if (storage.areBlockContentsStale()) {
                LOG.info("BLOCK* processOverReplicatedBlock: Postponing processing of over-replicated " + block + " since storage " + storage + " does not yet have up-to-date block information.");
                this.postponeBlock(block);
                return;
            }
            if (this.excessReplicateMap.contains(storage, block) || cur.isDecommissionInProgress() || cur.isDecommissioned() || corruptNodes != null && corruptNodes.contains(cur)) continue;
            nonExcess.add(storage);
        }
        this.chooseExcessReplicates(nonExcess, block, replication, addedNode, delNodeHint, this.blockplacement);
    }

    private void chooseExcessReplicates(Collection<DatanodeStorageInfo> nonExcess, BlockInfoContiguous b, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint, BlockPlacementPolicy replicator) throws StorageException, TransactionContextException, IOException {
        BlockCollection bc = this.getBlockCollection(b);
        BlockStoragePolicy storagePolicy = this.storagePolicySuite.getPolicy(bc.getStoragePolicyID());
        List<StorageType> excessTypes = storagePolicy.chooseExcess(replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
        HashMap<String, List<DatanodeStorageInfo>> rackMap = new HashMap<String, List<DatanodeStorageInfo>>();
        ArrayList<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
        ArrayList<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
        replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
        boolean firstOne = true;
        DatanodeStorageInfo delNodeHintStorage = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
        DatanodeStorageInfo addedNodeStorage = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
        while (nonExcess.size() - replication > 0) {
            DatanodeStorageInfo cur = BlockManager.useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, moreThanOne, excessTypes) ? delNodeHintStorage : replicator.chooseReplicaToDelete(bc, b, replication, moreThanOne, exactlyOne, excessTypes);
            firstOne = false;
            replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur);
            nonExcess.remove(cur);
            this.addToExcessReplicate(cur, b);
            this.addToInvalidates(b, cur.getDatanodeDescriptor());
            blockLog.info("BLOCK* chooseExcessReplicates: ({}, {}) is added to invalidated blocks set", (Object)cur, (Object)b);
        }
    }

    static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, List<StorageType> excessTypes) {
        if (!isFirst) {
            return false;
        }
        if (delHint == null) {
            return false;
        }
        if (!excessTypes.contains(delHint.getStorageType())) {
            return false;
        }
        if (moreThan1Racks.contains(delHint)) {
            return true;
        }
        return added != null && !moreThan1Racks.contains(added);
    }

    private void addToExcessReplicate(DatanodeStorageInfo storage, BlockInfoContiguous block) throws StorageException, TransactionContextException {
        if (this.excessReplicateMap.put(storage.getSid(), block) && blockLog.isDebugEnabled()) {
            blockLog.debug("BLOCK* addToExcessReplicate: (" + storage + ", " + block + ") is added to excessReplicateMap");
        }
    }

    public void removeStoredBlock(Block b, DatanodeDescriptor node) throws IOException {
        if (blockLog.isDebugEnabled()) {
            blockLog.debug("BLOCK* removeStoredBlock: {} from {}", (Object)b, (Object)node);
        }
        if (!this.blocksMap.removeNode(b, node)) {
            if (blockLog.isDebugEnabled()) {
                blockLog.debug("BLOCK* removeStoredBlock: {} has already been removed from node {}", (Object)b, (Object)node);
            }
            return;
        }
        BlockInfoContiguous block = this.getStoredBlock(b);
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        if (bc != null) {
            this.namesystem.decrementSafeBlockCount(block);
            this.updateNeededReplications(block, -1, 0);
        }
        if (this.excessReplicateMap.remove(node, block) && blockLog.isDebugEnabled()) {
            blockLog.debug("BLOCK* removeStoredBlock: " + block + " is removed from excessBlocks");
        }
        this.corruptReplicas.removeFromCorruptReplicasMap(block, node);
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        if (fsNamesystem.isErasureCodingEnabled()) {
            BlockInfoContiguous blockInfo = this.getStoredBlock(block);
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
            if (status != null) {
                NumberReplicas numberReplicas = this.countNodes(block);
                if (numberReplicas.liveReplicas() == 0) {
                    if (!status.isCorrupt()) {
                        status.setStatus(EncodingStatus.Status.REPAIR_REQUESTED);
                        status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    status.setLostBlocks(Integer.valueOf(status.getLostBlocks() + 1));
                    EntityManager.update((Object)status);
                }
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
                if (status == null) {
                    LOG.info("removeStoredBlock returned null for " + blockInfo.getInodeId());
                } else {
                    LOG.info("removeStoredBlock found " + blockInfo.getInodeId() + " with status " + status);
                }
                if (status != null) {
                    NumberReplicas numberReplicas = this.countNodes(block);
                    if (numberReplicas.liveReplicas() == 0) {
                        if (!status.isParityCorrupt()) {
                            status.setParityStatus(EncodingStatus.ParityStatus.REPAIR_REQUESTED);
                            status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                        }
                        status.setLostParityBlocks(Integer.valueOf(status.getLostParityBlocks() + 1));
                        EntityManager.update((Object)status);
                        LOG.info("removeStoredBlock updated parity status to repair requested");
                    } else {
                        LOG.info("removeStoredBlock found replicas: " + numberReplicas.liveReplicas());
                    }
                }
            }
        }
    }

    public void removeStoredBlock(BlockInfoContiguous block, int sid) throws IOException {
        if (blockLog.isDebugEnabled()) {
            blockLog.debug("BLOCK* removeStoredBlock: " + block + " from " + sid);
        }
        if (!this.blocksMap.removeNode((Block)block, sid)) {
            if (blockLog.isDebugEnabled()) {
                blockLog.debug("BLOCK* removeStoredBlock: " + block + " has already been removed from node " + sid);
            }
            return;
        }
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        if (bc != null) {
            this.namesystem.decrementSafeBlockCount(block);
            this.updateNeededReplications(block, -1, 0);
        }
        this.corruptReplicas.forceRemoveFromCorruptReplicasMap(block, sid);
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        if (fsNamesystem.isErasureCodingEnabled()) {
            BlockInfoContiguous blockInfo = this.getStoredBlock(block);
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
            if (status != null) {
                NumberReplicas numberReplicas = this.countNodes(block);
                if (numberReplicas.liveReplicas() == 0) {
                    if (!status.isCorrupt()) {
                        status.setStatus(EncodingStatus.Status.REPAIR_REQUESTED);
                        status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    status.setLostBlocks(Integer.valueOf(status.getLostBlocks() + 1));
                    EntityManager.update((Object)status);
                }
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
                if (status == null) {
                    LOG.info("removeStoredBlock returned null for " + blockInfo.getInodeId());
                } else {
                    LOG.info("removeStoredBlock found " + blockInfo.getInodeId() + " with status " + status);
                }
                if (status != null) {
                    NumberReplicas numberReplicas = this.countNodes(block);
                    if (numberReplicas.liveReplicas() == 0) {
                        if (!status.isParityCorrupt()) {
                            status.setParityStatus(EncodingStatus.ParityStatus.REPAIR_REQUESTED);
                            status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                        }
                        status.setLostParityBlocks(Integer.valueOf(status.getLostParityBlocks() + 1));
                        EntityManager.update((Object)status);
                        LOG.info("removeStoredBlock updated parity status to repair requested");
                    } else {
                        LOG.info("removeStoredBlock found replicas: " + numberReplicas.liveReplicas());
                    }
                }
            }
        }
    }

    private long addBlocks(List<Block> blocks, List<BlocksWithLocations.BlockWithLocations> results) throws IOException {
        ArrayList<Long> blockIds = new ArrayList<Long>(blocks.size());
        HashMap<Long, Block> blockIdsToBlocks = new HashMap<Long, Block>();
        for (Block block : blocks) {
            blockIds.add(block.getBlockId());
            blockIdsToBlocks.put(block.getBlockId(), block);
        }
        final Map<Long, List<Long>> inodeIdsToBlockMap = INodeUtil.getINodeIdsForBlockIds(blockIds, this.slicerBatchSize, this.slicerNbThreads, ((FSNamesystem)this.namesystem).getFSOperationsExecutor());
        final ArrayList<Long> allInodeIds = new ArrayList<Long>(inodeIdsToBlockMap.keySet());
        final ConcurrentHashMap locationsMap = new ConcurrentHashMap();
        try {
            Slicer.slice((int)allInodeIds.size(), (int)this.slicerBatchSize, (int)this.slicerNbThreads, (ExecutorService)((FSNamesystem)this.namesystem).getFSOperationsExecutor(), (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    final List inodeIds = allInodeIds.subList(startIndex, endIndex);
                    new HopsTransactionalRequestHandler(HDFSOperationType.GET_VALID_BLK_LOCS){
                        List<INodeIdentifier> inodeIdentifiers;

                        @Override
                        public void setUp() throws StorageException, IOException {
                            this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
                        }

                        public void acquireLock(TransactionLocks locks) throws IOException {
                            LockFactory lf = LockFactory.getInstance();
                            locks.add(lf.getMultipleINodesLock(this.inodeIdentifiers, TransactionLockTypes.INodeLockType.READ)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV));
                        }

                        public Object performTask() throws IOException {
                            for (INodeIdentifier identifier : this.inodeIdentifiers) {
                                INode inode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{identifier.getInodeId()});
                                if (inode == null) {
                                    LOG.debug("inode " + identifier.getInodeId() + " does not exist anymore");
                                    continue;
                                }
                                Iterator iterator = ((List)inodeIdsToBlockMap.get(inode.getId())).iterator();
                                while (iterator.hasNext()) {
                                    long blockId = (Long)iterator.next();
                                    BlockInfoContiguous block = (BlockInfoContiguous)EntityManager.find((FinderType)BlockInfoContiguous.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{blockId});
                                    if (block == null) {
                                        LOG.debug("block " + blockId + "does not exist anymore");
                                        continue;
                                    }
                                    List ms = BlockManager.this.getValidLocations(block);
                                    if (ms.isEmpty()) continue;
                                    locationsMap.put(block, ms);
                                }
                            }
                            return null;
                        }
                    }.handle(BlockManager.this.namesystem);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        if (locationsMap.isEmpty()) {
            return 0L;
        }
        long numBytes = 0L;
        for (Block block : locationsMap.keySet()) {
            List locations = (List)locationsMap.get(block);
            String[] datanodeUuids = new String[locations.size()];
            String[] storageIDs = new String[datanodeUuids.length];
            StorageType[] storageTypes = new StorageType[datanodeUuids.length];
            for (int i = 0; i < locations.size(); ++i) {
                DatanodeStorageInfo s = (DatanodeStorageInfo)locations.get(i);
                datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
                storageIDs[i] = s.getStorageID();
                storageTypes[i] = s.getStorageType();
            }
            results.add(new BlocksWithLocations.BlockWithLocations(block, datanodeUuids, storageIDs, storageTypes));
            numBytes += block.getNumBytes();
        }
        return numBytes;
    }

    @VisibleForTesting
    void addBlock(DatanodeStorageInfo storage, Block block, String delHint) throws IOException {
        BlockInfoContiguous storedBlock;
        DatanodeDescriptor node = storage.getDatanodeDescriptor();
        node.decrementBlocksScheduled(storage.getStorageType());
        DatanodeDescriptor delHintNode = null;
        if (delHint != null && delHint.length() != 0 && (delHintNode = this.datanodeManager.getDatanodeByUuid(delHint)) == null) {
            blockLog.warn("BLOCK* blockReceived: {} is expected to be removed from an unrecorded node {}", (Object)block, (Object)delHint);
        }
        if ((storedBlock = this.getStoredBlock(block)) == null) {
            storedBlock = new BlockInfoContiguous();
            storedBlock.setINodeIdNoPersistance(BlockInfoContiguous.NON_EXISTING_ID);
            storedBlock.setNoPersistance(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp());
        }
        this.pendingReplications.decrement(storedBlock, node);
        this.processAndHandleReportedBlock(storage, block, HdfsServerConstants.ReplicaState.FINALIZED, delHintNode);
    }

    private void processAndHandleReportedBlock(DatanodeStorageInfo storageInfo, Block block, HdfsServerConstants.ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException {
        LinkedList<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
        LinkedList<BlockInfoContiguous> toInvalidate = new LinkedList<BlockInfoContiguous>();
        LinkedList<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
        LinkedList<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
        DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
        this.processIncrementallyReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC);
        assert (toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1) : "The block should be only in one of the lists.";
        for (StatefulBlockInfo b : toUC) {
            this.addStoredBlockUnderConstruction(b, storageInfo);
        }
        long numBlocksLogged = 0L;
        for (BlockInfoContiguous blockInfoContiguous : toAdd) {
            this.addStoredBlock(blockInfoContiguous, storageInfo, delHintNode, numBlocksLogged < this.maxNumBlocksToLog);
            ++numBlocksLogged;
        }
        if (numBlocksLogged > this.maxNumBlocksToLog) {
            blockLog.info("BLOCK* addBlock: logged info for {} of {} reported.", (Object)this.maxNumBlocksToLog, (Object)numBlocksLogged);
        }
        for (BlockInfoContiguous blockInfoContiguous : toInvalidate) {
            blockLog.info("BLOCK* addBlock: block {} on node {} size {} does not belong to any file", new Object[]{blockInfoContiguous, storageInfo, blockInfoContiguous.getNumBytes()});
            this.addToInvalidates(blockInfoContiguous, storageInfo.getDatanodeDescriptor());
        }
        for (BlockToMarkCorrupt blockToMarkCorrupt : toCorrupt) {
            this.markBlockAsCorrupt(blockToMarkCorrupt, storageInfo, storageInfo.getDatanodeDescriptor());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processIncrementalBlockReport(DatanodeRegistration nodeID, StorageReceivedDeletedBlocks blockInfos) throws IOException {
        final int[] received = new int[]{0};
        final int[] deleted = new int[]{0};
        int[] receiving = new int[]{0};
        final DatanodeDescriptor node = this.datanodeManager.getDatanode(nodeID);
        if (node == null || !node.isAlive) {
            blockLog.warn("BLOCK* processIncrementalBlockReport is received from dead or unregistered node {}", (Object)nodeID);
            throw new IOException("Got incremental block report from unregistered or dead node");
        }
        DatanodeStorageInfo s = node.getStorageInfo(blockInfos.getStorage().getStorageID());
        if (s == null) {
            s = node.updateStorage(blockInfos.getStorage());
        }
        final DatanodeStorageInfo storage = s;
        HopsTransactionalRequestHandler processIncrementalBlockReportHandler = new HopsTransactionalRequestHandler(HDFSOperationType.BLOCK_RECEIVED_AND_DELETED_INC_BLK_REPORT){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                ReceivedDeletedBlockInfo rdbi = (ReceivedDeletedBlockInfo)this.getParams()[0];
                LOG.debug("reported block id=" + rdbi.getBlock().getBlockId());
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(rdbi.getBlock());
                if (this.inodeIdentifier == null && !rdbi.isDeletedBlock()) {
                    LOG.warn("Invalid State. deleted blk is not recognized. bid=" + rdbi.getBlock().getBlockId());
                }
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                ReceivedDeletedBlockInfo rdbi = (ReceivedDeletedBlockInfo)this.getParams()[0];
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(rdbi.getBlock().getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR));
                if (!rdbi.isDeletedBlock()) {
                    locks.add(lf.getBlockRelated(LockFactory.BLK.PE, LockFactory.BLK.UC, LockFactory.BLK.IV));
                }
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
                locks.add(lf.getIndividualHashBucketLock(storage.getSid(), HashBuckets.getInstance().getBucketForBlock(rdbi.getBlock())));
            }

            public Object performTask() throws IOException {
                ReceivedDeletedBlockInfo rdbi = (ReceivedDeletedBlockInfo)this.getParams()[0];
                LOG.debug("BLOCK_RECEIVED_AND_DELETED_INC_BLK_REPORT " + (Object)((Object)rdbi.getStatus()) + " bid=" + rdbi.getBlock().getBlockId() + " dataNode=" + node.getXferAddr() + " storage=" + storage.getStorageID() + " sid: " + storage.getSid() + " status=" + (Object)((Object)rdbi.getStatus()));
                HashBuckets hashBuckets = HashBuckets.getInstance();
                this.addSubopName(rdbi.getStatus().toString());
                switch (rdbi.getStatus()) {
                    case RECEIVING_BLOCK: {
                        BlockManager.this.processAndHandleReportedBlock(storage, rdbi.getBlock(), HdfsServerConstants.ReplicaState.RBW, null);
                        received[0] = received[0] + 1;
                        break;
                    }
                    case APPENDING: {
                        BlockManager.this.processAndHandleReportedBlock(storage, rdbi.getBlock(), HdfsServerConstants.ReplicaState.RBW, null);
                        received[0] = received[0] + 1;
                        break;
                    }
                    case RECOVERING_APPEND: {
                        BlockManager.this.processAndHandleReportedBlock(storage, rdbi.getBlock(), HdfsServerConstants.ReplicaState.RBW, null);
                        received[0] = received[0] + 1;
                        break;
                    }
                    case RECEIVED_BLOCK: {
                        BlockManager.this.addBlock(storage, rdbi.getBlock(), rdbi.getDelHints());
                        hashBuckets.applyHash(storage.getSid(), HdfsServerConstants.ReplicaState.FINALIZED, rdbi.getBlock());
                        received[0] = received[0] + 1;
                        break;
                    }
                    case UPDATE_RECOVERED: {
                        BlockManager.this.addBlock(storage, rdbi.getBlock(), rdbi.getDelHints());
                        received[0] = received[0] + 1;
                        break;
                    }
                    case DELETED_BLOCK: {
                        BlockManager.this.removeStoredBlock(rdbi.getBlock(), storage.getDatanodeDescriptor());
                        deleted[0] = deleted[0] + 1;
                        break;
                    }
                    default: {
                        String msg = "Unknown block status code reported by " + storage.getStorageID() + ": " + rdbi;
                        blockLog.warn(msg);
                        assert (false) : msg;
                        break;
                    }
                }
                if (blockLog.isDebugEnabled()) {
                    blockLog.debug("BLOCK* block " + (Object)((Object)rdbi.getStatus()) + ": " + rdbi.getBlock() + " is received from " + storage.getStorageID());
                }
                return null;
            }
        };
        try {
            if (node == null || !node.isAlive) {
                blockLog.warn("BLOCK* processIncrementalBlockReport is received from dead or unregistered node " + nodeID);
                throw new IOException("Got incremental block report from unregistered or dead node");
            }
            for (ReceivedDeletedBlockInfo rdbi : blockInfos.getBlocks()) {
                processIncrementalBlockReportHandler.setParams(new Object[]{rdbi});
                processIncrementalBlockReportHandler.handle(this.namesystem);
            }
        }
        finally {
            if (blockLog.isDebugEnabled()) {
                blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from " + nodeID + " receiving: " + receiving[0] + ",  received: " + received[0] + ",  deleted: " + deleted[0]);
            }
        }
    }

    public NumberReplicas countNodes(Block b) throws IOException {
        int decommissioned = 0;
        int decommissioning = 0;
        int live = 0;
        int corrupt = 0;
        int excess = 0;
        int stale = 0;
        BlockInfoContiguous block = this.getStoredBlock(b);
        if (block != null) {
            Collection<DatanodeDescriptor> nodesCorrupt = this.corruptReplicas.getNodes(block);
            for (DatanodeStorageInfo storage : this.blocksMap.storageList(block, DatanodeStorage.State.NORMAL)) {
                DatanodeDescriptor node = storage.getDatanodeDescriptor();
                if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
                    ++corrupt;
                } else if (node.isDecommissionInProgress()) {
                    ++decommissioning;
                } else if (node.isDecommissioned()) {
                    ++decommissioned;
                } else {
                    LightWeightLinkedSet<Block> blocksExcess = this.excessReplicateMap.getExcessReplica(block, storage.getSid());
                    if (blocksExcess != null && blocksExcess.contains(block)) {
                        ++excess;
                    } else {
                        ++live;
                    }
                }
                if (!storage.areBlockContentsStale()) continue;
                ++stale;
            }
        }
        return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
    }

    int countLiveNodes(BlockInfoContiguous b) throws IOException {
        if (!this.namesystem.isInStartupSafeMode()) {
            return this.countNodes(b).liveReplicas();
        }
        int live = 0;
        List<DatanodeStorageInfo> storages = this.blocksMap.storageList(b, DatanodeStorage.State.NORMAL);
        Collection<DatanodeDescriptor> nodesCorrupt = this.corruptReplicas.getNodes(b);
        for (DatanodeStorageInfo storage : storages) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            if (nodesCorrupt != null && nodesCorrupt.contains(node)) continue;
            ++live;
        }
        return live;
    }

    void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) throws IOException {
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        final int[] numOverReplicated = new int[]{0};
        Map<Long, Long> blocksOnNode = srcNode.getAllStorageReplicas(this.numBuckets, this.blockFetcherNBThreads, this.blockFetcherBucketsPerThread, ((FSNamesystem)this.namesystem).getFSOperationsExecutor());
        final HashMap<Long, ArrayList<Long>> inodeIdsToBlockMap = new HashMap<Long, ArrayList<Long>>();
        for (Map.Entry<Long, Long> entry : blocksOnNode.entrySet()) {
            ArrayList<Long> list = (ArrayList<Long>)inodeIdsToBlockMap.get(entry.getValue());
            if (list == null) {
                list = new ArrayList<Long>();
                inodeIdsToBlockMap.put(entry.getValue(), list);
            }
            list.add(entry.getKey());
        }
        final ArrayList inodeIds = new ArrayList(inodeIdsToBlockMap.keySet());
        try {
            Slicer.slice((int)inodeIds.size(), (int)this.slicerBatchSize, (int)this.slicerNbThreads, (ExecutorService)((FSNamesystem)this.namesystem).getFSOperationsExecutor(), (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    final List ids = inodeIds.subList(startIndex, endIndex);
                    new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_OVER_REPLICATED_BLOCKS_ON_RECOMMISSION){
                        List<INodeIdentifier> inodeIdentifiers;

                        @Override
                        public void setUp() throws StorageException {
                            this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(ids);
                        }

                        public void acquireLock(TransactionLocks locks) throws IOException {
                            LockFactory lf = LockFactory.getInstance();
                            locks.add(lf.getMultipleINodesLock(this.inodeIdentifiers, TransactionLockTypes.INodeLockType.WRITE)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                        }

                        public Object performTask() throws IOException {
                            for (INodeIdentifier identifier : this.inodeIdentifiers) {
                                INode inode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{identifier.getInodeId()});
                                if (inode == null) {
                                    LOG.debug("inode " + identifier.getInodeId() + " does not exist anymore");
                                    continue;
                                }
                                Iterator iterator = ((List)inodeIdsToBlockMap.get(identifier.getInodeId())).iterator();
                                while (iterator.hasNext()) {
                                    long blockId = (Long)iterator.next();
                                    BlockInfoContiguous block = (BlockInfoContiguous)EntityManager.find((FinderType)BlockInfoContiguous.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{blockId});
                                    if (block == null) {
                                        LOG.debug("block " + blockId + " does not exist anymore");
                                        continue;
                                    }
                                    BlockCollection bc = BlockManager.this.blocksMap.getBlockCollection(block);
                                    short expectedReplication = bc.getBlockReplication();
                                    NumberReplicas num = BlockManager.this.countNodes(block);
                                    int numCurrentReplica = num.liveReplicas();
                                    if (numCurrentReplica <= expectedReplication) continue;
                                    BlockManager.this.processOverReplicatedBlock(block, expectedReplication, null, null);
                                    numOverReplicated[0] = numOverReplicated[0] + 1;
                                }
                            }
                            return null;
                        }
                    }.handle();
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        LOG.info("Invalidated " + numOverReplicated[0] + " over-replicated blocks on " + srcNode + " during recommissioning");
    }

    boolean isNodeHealthyForDecommission(DatanodeDescriptor node) throws IOException {
        if (!node.checkBlockReportReceived()) {
            LOG.info("Node {} hasn't sent its first block report.", (Object)node);
            return false;
        }
        if (node.isAlive) {
            return true;
        }
        this.updateState();
        if (this.pendingReplicationBlocksCount == 0L && this.underReplicatedBlocksCount == 0L) {
            LOG.info("Node {} is dead and there are no under-replicated blocks or blocks pending replication. Safe to decommission.", (Object)node);
            return true;
        }
        LOG.warn("Node {} is dead while decommission is in progress. Cannot be safely decommissioned since there is risk of reduced data durability or data loss. Either restart the failed node or force decommissioning by removing, calling refreshNodes, then re-adding to the excludes files.", (Object)node);
        return false;
    }

    public int getActiveBlockCount() throws IOException {
        return this.blocksMap.size();
    }

    public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) throws TransactionContextException, StorageException {
        return block.getStorages(this.datanodeManager);
    }

    public int getTotalBlocks() throws IOException {
        return this.blocksMap.size();
    }

    public void removeBlock(Block block) throws StorageException, TransactionContextException, IOException {
        BlockInfoContiguous storedBlock = this.getStoredBlock(block);
        if (storedBlock == null) {
            LOG.debug("block " + block.getBlockId() + " does not exist anymore");
            return;
        }
        this.addToInvalidates(storedBlock);
        this.removeBlockFromMap(storedBlock);
        this.pendingReplications.remove(storedBlock);
        this.neededReplications.remove(storedBlock);
        if (this.postponedMisreplicatedBlocks.remove(storedBlock)) {
            this.postponedMisreplicatedBlocksCount.decrementAndGet();
        }
    }

    public BlockInfoContiguous getStoredBlock(Block block) throws StorageException, TransactionContextException {
        return this.blocksMap.getStoredBlock(block);
    }

    private void updateNeededReplications(BlockInfoContiguous block, int curReplicasDelta, int expectedReplicasDelta) throws IOException {
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        NumberReplicas repl = this.countNodes(block);
        int curExpectedReplicas = this.getReplication(block);
        if (this.isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
            this.neededReplications.update(block, repl.liveReplicas(), repl.decommissionedAndDecommissioning(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
        } else {
            int oldReplicas = repl.liveReplicas() - curReplicasDelta;
            int oldExpectedReplicas = curExpectedReplicas - expectedReplicasDelta;
            this.neededReplications.remove(block, oldReplicas, repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
        }
    }

    public void checkReplication(BlockCollection bc) throws IOException {
        short expected = bc.getBlockReplication();
        for (BlockInfoContiguous block : bc.getBlocks()) {
            NumberReplicas n = this.countNodes(block);
            if (this.isNeededReplication(block, expected, n.liveReplicas())) {
                this.neededReplications.add(block, n.liveReplicas(), n.decommissionedAndDecommissioning(), expected);
                continue;
            }
            if (n.liveReplicas() <= expected) continue;
            this.processOverReplicatedBlock(block, expected, null, null);
        }
    }

    public boolean checkBlocksProperlyReplicated(String src, BlockInfoContiguous[] blocks) throws StorageException, TransactionContextException {
        for (BlockInfoContiguous b : blocks) {
            if (b.isComplete()) continue;
            BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)b;
            int numNodes = b.getStorages(this.getDatanodeManager()).length;
            LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + (Object)((Object)uc.getBlockUCState()) + ", replication# = " + numNodes + (numNodes < this.minReplication ? " < " : " >= ") + " minimum = " + this.minReplication + ") in file " + src);
            return false;
        }
        return true;
    }

    private int getReplication(Block block) throws StorageException, TransactionContextException {
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        return bc == null ? (short)0 : bc.getBlockReplication();
    }

    private int invalidateWorkForOneNode(Map.Entry<DatanodeInfo, Set<Integer>> entry) throws IOException {
        if (this.namesystem.isInSafeMode()) {
            LOG.debug("In safemode, not computing replication work");
            return 0;
        }
        DatanodeDescriptor dnDescriptor = this.datanodeManager.getDatanode(entry.getKey());
        if (dnDescriptor == null) {
            LOG.warn("DataNode " + entry.getKey() + " cannot be found for sids " + Arrays.toString(entry.getValue().toArray()) + ", removing block invalidation work.");
            this.invalidateBlocks.remove(new ArrayList<Integer>((Collection)entry.getValue()));
            return 0;
        }
        List<Block> toInvalidate = this.invalidateBlocks.invalidateWork(dnDescriptor);
        if (toInvalidate == null) {
            return 0;
        }
        if (blockLog.isInfoEnabled()) {
            blockLog.info("BLOCK* {}: ask {} to delete {}", new Object[]{this.getClass().getSimpleName(), entry.getKey(), toInvalidate});
        }
        return toInvalidate.size();
    }

    boolean blockHasEnoughRacks(BlockInfoContiguous b) throws StorageException, TransactionContextException {
        if (!this.shouldCheckForEnoughRacks) {
            return true;
        }
        boolean enoughRacks = false;
        Collection<DatanodeDescriptor> corruptNodes = this.corruptReplicas.getNodes(b);
        int numExpectedReplicas = this.getReplication(b);
        String rackName = null;
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(b)) {
            DatanodeDescriptor cur = storage.getDatanodeDescriptor();
            if (cur.isDecommissionInProgress() || cur.isDecommissioned() || corruptNodes != null && corruptNodes.contains(cur)) continue;
            if (numExpectedReplicas == 1 || numExpectedReplicas > 1 && !this.datanodeManager.hasClusterEverBeenMultiRack()) {
                enoughRacks = true;
                break;
            }
            String rackNameNew = cur.getNetworkLocation();
            if (rackName == null) {
                rackName = rackNameNew;
                continue;
            }
            if (rackName.equals(rackNameNew)) continue;
            enoughRacks = true;
            break;
        }
        return enoughRacks;
    }

    boolean isNeededReplication(BlockInfoContiguous b, int expected, int current) throws StorageException, TransactionContextException {
        return current < expected || !this.blockHasEnoughRacks(b);
    }

    public long getMissingBlocksCount() throws IOException {
        return this.neededReplications.getCorruptBlockSize();
    }

    public long getMissingReplOneBlocksCount() throws IOException {
        return this.neededReplications.getCorruptReplOneBlockSize();
    }

    public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block, BlockCollection bc) throws StorageException, TransactionContextException {
        return this.blocksMap.addBlockCollection(block, bc);
    }

    public BlockCollection getBlockCollection(Block b) throws StorageException, TransactionContextException {
        return this.blocksMap.getBlockCollection(b);
    }

    public Iterable<DatanodeStorageInfo> getStorages(Block block) throws StorageException, TransactionContextException {
        return this.blocksMap.getStorages(block);
    }

    public List<DatanodeStorageInfo> storageList(Block block) throws StorageException, TransactionContextException {
        return this.blocksMap.storageList(block);
    }

    public int numCorruptReplicas(BlockInfoContiguous block) throws StorageException, TransactionContextException {
        return this.corruptReplicas.numCorruptReplicas(block);
    }

    public void removeBlockFromMap(Block block) throws IOException {
        BlockInfoContiguous storedBlock = this.getStoredBlock(block);
        if (storedBlock == null) {
            LOG.debug("block " + block.getBlockId() + " does not exist anymore");
            return;
        }
        this.removeFromExcessReplicateMap(storedBlock);
        this.corruptReplicas.removeFromCorruptReplicasMap(storedBlock);
        this.blocksMap.removeBlock(block);
    }

    private void removeFromExcessReplicateMap(BlockInfoContiguous blockInfo) throws IOException {
        for (DatanodeStorageInfo info : this.blocksMap.getStorages(blockInfo)) {
            this.excessReplicateMap.remove(info.getDatanodeDescriptor(), blockInfo);
        }
    }

    public int getCapacity() {
        return this.blocksMap.getCapacity();
    }

    public long[] getCorruptReplicaBlockIds(int numExpectedBlocks, Long startingBlockId) throws IOException {
        return this.corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks, startingBlockId);
    }

    public Iterator<Block> getCorruptReplicaBlockIterator() {
        return this.neededReplications.iterator(4);
    }

    public Collection<DatanodeDescriptor> getCorruptReplicas(BlockInfoContiguous block) throws StorageException, TransactionContextException {
        return this.corruptReplicas.getNodes(block);
    }

    public String getCorruptReason(BlockInfoContiguous block, DatanodeDescriptor node) throws IOException {
        return this.corruptReplicas.getCorruptReason(block, node);
    }

    public int numOfUnderReplicatedBlocks() throws IOException {
        return this.neededReplications.size();
    }

    int computeDatanodeWork() throws IOException {
        if (this.namesystem.isInSafeMode()) {
            return 0;
        }
        int numlive = this.heartbeatManager.getLiveDatanodeCount();
        int blocksToProcess = numlive * this.blocksReplWorkMultiplier;
        int nodesToProcess = (int)Math.ceil((float)numlive * this.blocksInvalidateWorkPct);
        int workFound = this.computeReplicationWork(blocksToProcess);
        this.updateState();
        this.scheduledReplicationBlocksCount = workFound;
        return workFound += this.computeInvalidateWork(nodesToProcess);
    }

    public void clearQueues() throws IOException {
        this.neededReplications.clear();
        this.pendingReplications.clear();
        this.excessReplicateMap.clear();
        this.invalidateBlocks.clear();
        this.datanodeManager.clearPendingQueues();
        this.postponedMisreplicatedBlocks.clear();
        this.postponedMisreplicatedBlocksCount.set(0L);
    }

    public static LocatedBlock newLocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages, long startOffset, boolean corrupt) {
        return new LocatedBlock(b, DatanodeStorageInfo.toDatanodeInfos(storages), DatanodeStorageInfo.toStorageIDs(storages), DatanodeStorageInfo.toStorageTypes(storages), startOffset, corrupt, null);
    }

    private void removeStoredBlocksTx(final List<Long> inodeIds, final Map<Long, List<Long>> inodeIdsToBlockMap, final DatanodeDescriptor node) throws IOException {
        final AtomicInteger removedBlocks = new AtomicInteger(0);
        new HopsTransactionalRequestHandler(HDFSOperationType.REMOVE_STORED_BLOCKS){
            List<INodeIdentifier> inodeIdentifiers;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getINodesLocks(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifiers)).add(lf.getBlockLock()).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifiers != null) {
                    locks.add(lf.getBatchedEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifiers));
                }
            }

            public Object performTask() throws IOException {
                for (INodeIdentifier identifier : this.inodeIdentifiers) {
                    INode inode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{identifier.getInodeId()});
                    if (inode == null) {
                        LOG.debug("inode " + identifier.getInodeId() + " does not exist anymore");
                        continue;
                    }
                    Iterator iterator = ((List)inodeIdsToBlockMap.get(identifier.getInodeId())).iterator();
                    while (iterator.hasNext()) {
                        long blockId = (Long)iterator.next();
                        BlockInfoContiguous block = (BlockInfoContiguous)EntityManager.find((FinderType)BlockInfoContiguous.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{blockId});
                        if (block == null) {
                            LOG.debug("block " + blockId + " does not exit anymore");
                            continue;
                        }
                        BlockManager.this.removeStoredBlock((Block)block, node);
                        removedBlocks.incrementAndGet();
                    }
                }
                return null;
            }
        }.handle(this.namesystem);
        LOG.debug("removed " + removedBlocks.get() + " replicas from " + node.getName());
    }

    private void removeStoredBlocksTx(final List<Long> inodeIds, final Map<Long, List<Long>> inodeIdsToBlockMap, final int sid) throws IOException {
        final AtomicInteger removedBlocks = new AtomicInteger(0);
        new HopsTransactionalRequestHandler(HDFSOperationType.REMOVE_STORED_BLOCKS){
            List<INodeIdentifier> inodeIdentifiers;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getMultipleINodesLock(this.inodeIdentifiers, TransactionLockTypes.INodeLockType.WRITE)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifiers != null) {
                    locks.add(lf.getBatchedEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifiers));
                }
            }

            public Object performTask() throws IOException {
                for (INodeIdentifier identifier : this.inodeIdentifiers) {
                    INode inode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{identifier.getInodeId()});
                    if (inode == null) {
                        LOG.debug("inode " + identifier.getInodeId() + " does not exist anymore");
                        continue;
                    }
                    Iterator iterator = ((List)inodeIdsToBlockMap.get(identifier.getInodeId())).iterator();
                    while (iterator.hasNext()) {
                        long blockId = (Long)iterator.next();
                        BlockInfoContiguous block = (BlockInfoContiguous)EntityManager.find((FinderType)BlockInfoContiguous.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{blockId});
                        if (block == null) {
                            LOG.debug("block " + blockId + " does not exit anymore");
                            continue;
                        }
                        BlockManager.this.removeStoredBlock(block, sid);
                        removedBlocks.incrementAndGet();
                    }
                }
                return null;
            }
        }.handle(this.namesystem);
        LOG.info("removed " + removedBlocks.get() + " replicas from " + sid);
    }

    @VisibleForTesting
    int computeReplicationWorkForBlock(final Block b, final int priority) throws IOException {
        return (Integer)new HopsTransactionalRequestHandler(HDFSOperationType.COMPUTE_REPLICATION_WORK_FOR_BLOCK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getBlockLock(b.getBlockId(), this.inodeIdentifier)).add(lf.getVariableLock(Variable.Finder.ReplicationIndex, TransactionLockTypes.LockType.WRITE)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR, LockFactory.BLK.UC));
            }

            public Object performTask() throws IOException {
                return BlockManager.this.computeReplicationWorkForBlockInternal(b, priority);
            }
        }.handle(this.namesystem);
    }

    @VisibleForTesting
    public void processTimedOutPendingBlock(final long timedOutItemId) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_TIMEDOUT_PENDING_BLOCK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlockID(timedOutItemId);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(timedOutItemId, this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockInfoContiguous timedOutItem = (BlockInfoContiguous)EntityManager.find((FinderType)BlockInfoContiguous.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{timedOutItemId});
                if (timedOutItem == null) {
                    return null;
                }
                NumberReplicas num = BlockManager.this.countNodes(timedOutItem);
                if (BlockManager.this.isNeededReplication(timedOutItem, BlockManager.this.getReplication(timedOutItem), num.liveReplicas())) {
                    BlockManager.this.neededReplications.add(timedOutItem, num.liveReplicas(), num.decommissionedAndDecommissioning(), BlockManager.this.getReplication(timedOutItem));
                }
                BlockManager.this.pendingReplications.remove(timedOutItem);
                return null;
            }
        }.handle(this.namesystem);
    }

    private void addStoredBlockTx(final List<BlockInfoContiguous> blocks, final List<Long> blockIds, final List<Long> inodeIds, final DatanodeStorageInfo storage, final DatanodeDescriptor delNodeHint, final boolean logEveryBlock) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_BLK){
            List<INodeIdentifier> inodeIdentifiers;
            {
                super(opType);
                this.inodeIdentifiers = new ArrayList<INodeIdentifier>();
            }

            @Override
            public void setUp() throws StorageException {
                HashSet addedInodeIds = new HashSet();
                this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getINodesLocks(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifiers)).add(lf.getBlockReportingLocks(Longs.toArray((Collection)blockIds), Longs.toArray((Collection)inodeIds), new long[0], 0)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.IV, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && !this.inodeIdentifiers.isEmpty()) {
                    locks.add(lf.getBatchedEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifiers));
                }
            }

            public Object performTask() throws IOException {
                for (BlockInfoContiguous block : blocks) {
                    Block block2 = BlockManager.this.addStoredBlock(block, storage, delNodeHint, logEveryBlock);
                }
                return null;
            }
        }.handle();
    }

    private void addStoredBlockUnderConstructionTx(final StatefulBlockInfo ucBlock, final DatanodeStorageInfo storage) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_UC_BLK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(ucBlock.reportedBlock);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(ucBlock.reportedBlock.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.UC, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockManager.this.addStoredBlockUnderConstruction(ucBlock, storage);
                return null;
            }
        }.handle();
    }

    private void addToInvalidates(Collection<Block> blocks, DatanodeStorageInfo storage) throws IOException {
        this.invalidateBlocks.add(blocks, storage);
    }

    private void markBlockAsCorruptTx(final BlockToMarkCorrupt b, final DatanodeStorageInfo storage) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_CORRUPT_BLK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b.corrupted);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(b.corrupted.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.UC, LockFactory.BLK.IV));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockManager.this.markBlockAsCorrupt(b, storage, storage.getDatanodeDescriptor());
                return null;
            }
        }.handle();
    }

    public int getTotalCompleteBlocks() throws IOException {
        return this.blocksMap.sizeCompleteOnly();
    }

    private void addStoredBlockUnderConstructionImmediateTx(final BlockInfoContiguousUnderConstruction block, final DatanodeStorageInfo storage, final HdfsServerConstants.ReplicaState reportedState) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_UC_BLK_IMMEDIATE){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.UC, LockFactory.BLK.CR, LockFactory.BLK.ER, LockFactory.BLK.PE, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockInfoContiguous storedBlock = BlockManager.this.blocksMap.getStoredBlock(block);
                if (storedBlock == null) {
                    blockLog.debug("BLOCK* addStoredBlockUnderConstructionImmediateTx: {} on {} size {} but it does not belong to any file", new Object[]{block, storage.getStorageID(), block.getNumBytes()});
                    return null;
                }
                HdfsServerConstants.BlockUCState ucState = storedBlock.getBlockUCState();
                if (BlockManager.this.isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
                    BlockInfoContiguousUnderConstruction blk = (BlockInfoContiguousUnderConstruction)storedBlock;
                    blk.addReplicaIfNotPresent(storage, reportedState, blk.getGenerationStamp());
                }
                if (reportedState == HdfsServerConstants.ReplicaState.FINALIZED) {
                    BlockManager.this.addStoredBlockImmediate(storedBlock, storage, false);
                }
                return null;
            }
        }.handle();
    }

    private void addStoredBlockImmediateTx(final List<BlockInfoContiguous> blocks, List<Long> blockIds, final List<Long> inodeIds, final DatanodeStorageInfo storage, final boolean logEveryBlock) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_BLK_IMMEDIATE){
            List<INodeIdentifier> inodeIdentifiers;
            {
                super(opType);
                this.inodeIdentifiers = new ArrayList<INodeIdentifier>();
            }

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getINodesLocks(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifiers)).add(lf.getBlockLock()).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.IV, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && !this.inodeIdentifiers.isEmpty()) {
                    locks.add(lf.getBatchedEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifiers));
                }
            }

            public Object performTask() throws IOException {
                for (BlockInfoContiguous block : blocks) {
                    BlockInfoContiguous storedBlock = BlockManager.this.blocksMap.getStoredBlock(block);
                    if (storedBlock == null) {
                        blockLog.debug("BLOCK* addStoredBlockUnderConstructionImmediateTx: {} on {} size {} but it does not belong to any file", new Object[]{block, storage.getStorageID(), block.getNumBytes()});
                        continue;
                    }
                    BlockManager.this.addStoredBlockImmediate(storedBlock, storage, logEveryBlock);
                }
                return null;
            }
        }.handle();
    }

    public void shutdown() {
        if (this.datanodeRemover != null) {
            this.datanodeRemover.shutdown();
        }
        this.stopReplicationInitializer();
    }

    public int getNumBuckets() {
        return this.numBuckets;
    }

    public int getBlockFetcherNBThreads() {
        return this.blockFetcherNBThreads;
    }

    public int getBlockFetcherBucketsPerThread() {
        return this.blockFetcherBucketsPerThread;
    }

    public int getRemovalBatchSize() {
        return this.slicerBatchSize;
    }

    public int getRemovalNoThreads() {
        return this.slicerNbThreads;
    }

    public void blockReportCompleted(DatanodeID nodeID, DatanodeStorage[] storages, boolean success) throws IOException {
        DatanodeDescriptor node;
        if (this.namesystem != null && this.namesystem.getNameNode() != null) {
            this.namesystem.getNameNode().getBRTrackingService().blockReportCompleted(nodeID.getXferAddr());
        }
        if (success && (node = this.datanodeManager.getDatanode(nodeID)) != null) {
            for (DatanodeStorage storage : storages) {
                DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
                storageInfo.receivedBlockReport();
            }
        }
    }

    @VisibleForTesting
    public BlocksMap getBlocksMap() {
        return this.blocksMap;
    }

    @VisibleForTesting
    public void setBlocksMapSpy(BlocksMap bm) {
        this.blocksMap = bm;
    }

    static enum MisReplicationResult {
        INVALID,
        UNDER_REPLICATED,
        OVER_REPLICATED,
        POSTPONE,
        UNDER_CONSTRUCTION,
        OK;

    }

    private static class ReplicationWork {
        private final BlockInfoContiguous block;
        private final BlockCollection bc;
        private final DatanodeDescriptor srcNode;
        private final List<DatanodeDescriptor> containingNodes;
        private final List<DatanodeStorageInfo> liveReplicaStorages;
        private final int additionalReplRequired;
        private DatanodeStorageInfo[] targets;
        private final int priority;

        public ReplicationWork(BlockInfoContiguous block, BlockCollection bc, DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes, List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, int priority) {
            this.block = block;
            this.bc = bc;
            this.srcNode = srcNode;
            this.srcNode.incrementPendingReplicationWithoutTargets();
            this.containingNodes = containingNodes;
            this.liveReplicaStorages = liveReplicaStorages;
            this.additionalReplRequired = additionalReplRequired;
            this.priority = priority;
            this.targets = null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set<Node> excludedNodes) throws TransactionContextException, StorageException {
            try {
                this.targets = blockplacement.chooseTarget(null, this.additionalReplRequired, this.srcNode, this.liveReplicaStorages, false, excludedNodes, this.block.getNumBytes(), storagePolicySuite.getPolicy(this.bc.getStoragePolicyID()));
            }
            finally {
                this.srcNode.decrementPendingReplicationWithoutTargets();
            }
        }

        static /* synthetic */ DatanodeStorageInfo[] access$902(ReplicationWork x0, DatanodeStorageInfo[] x1) {
            x0.targets = x1;
            return x1;
        }
    }

    private class ReplicationMonitor
    implements Runnable {
        private ReplicationMonitor() {
        }

        @Override
        public void run() {
            while (BlockManager.this.namesystem.isRunning()) {
                try {
                    if (BlockManager.this.namesystem.isLeader()) {
                        LOG.debug("Running replication monitor");
                        if (BlockManager.this.namesystem.isPopulatingReplQueues()) {
                            BlockManager.this.computeDatanodeWork();
                            BlockManager.this.processPendingReplications();
                            BlockManager.this.rescanPostponedMisreplicatedBlocks();
                        }
                    } else {
                        BlockManager.this.updateState();
                        LOG.debug("Namesystem is not leader: will not run replication monitor");
                    }
                    Thread.sleep(BlockManager.this.replicationRecheckInterval);
                }
                catch (Throwable t) {
                    if (t instanceof TransientStorageException || t instanceof StorageException) continue;
                    if (!BlockManager.this.namesystem.isRunning()) {
                        LOG.info("Stopping ReplicationMonitor.");
                        if (t instanceof InterruptedException) break;
                        LOG.info("ReplicationMonitor received an exception while shutting down.", t);
                        break;
                    }
                    if (!BlockManager.this.checkNSRunning && t instanceof InterruptedException) {
                        LOG.info("Stopping ReplicationMonitor for testing.");
                        break;
                    }
                    LOG.error("ReplicationMonitor thread received Runtime exception. ", t);
                    ExitUtil.terminate((int)1, (Throwable)t);
                }
            }
        }
    }

    public class ReportStatistics {
        int numBuckets;
        public int numBucketsMatching;
        int numBlocks;
        int numToRemove;
        int numToInvalidate;
        int numToCorrupt;
        int numToUC;
        int numToAdd;
        int numConsideredSafeIfInSafemode;

        public String toString() {
            return String.format("(buckets,bucketsMatching,blocks,toRemove,toInvalidate,toCorrupt,toUC,toAdd,safeBlocksIfSafeMode)=(%d,%d,%d,%d,%d,%d,%d,%d,%d)", this.numBuckets, this.numBucketsMatching, this.numBlocks, this.numToRemove, this.numToInvalidate, this.numToCorrupt, this.numToUC, this.numToAdd, this.numConsideredSafeIfInSafemode);
        }
    }

    private static class HashMatchingResult {
        private final List<Integer> matchingBuckets;
        private final List<Integer> mismatchedBuckets;

        HashMatchingResult(List<Integer> matchingBuckets, List<Integer> mismatchedBuckets) {
            this.matchingBuckets = matchingBuckets;
            this.mismatchedBuckets = mismatchedBuckets;
        }
    }

    private static class BlockToMarkCorrupt {
        final BlockInfoContiguous corrupted;
        final BlockInfoContiguous stored;
        final String reason;
        final CorruptReplicasMap.Reason reasonCode;

        BlockToMarkCorrupt(BlockInfoContiguous corrupted, BlockInfoContiguous stored, String reason, CorruptReplicasMap.Reason reasonCode) {
            Preconditions.checkNotNull((Object)corrupted, (Object)"corrupted is null");
            Preconditions.checkNotNull((Object)stored, (Object)"stored is null");
            this.corrupted = corrupted;
            this.stored = stored;
            this.reason = reason;
            this.reasonCode = reasonCode;
        }

        BlockToMarkCorrupt(BlockInfoContiguous stored, String reason, CorruptReplicasMap.Reason reasonCode) {
            this(stored, stored, reason, reasonCode);
        }

        BlockToMarkCorrupt(BlockInfoContiguous stored, long gs, String reason, CorruptReplicasMap.Reason reasonCode) {
            this(new BlockInfoContiguous(stored), stored, reason, reasonCode);
            this.corrupted.setGenerationStampNoPersistance(gs);
        }

        public String toString() {
            return this.corrupted + "(" + (this.corrupted == this.stored ? "same as stored" : "stored=" + this.stored) + ")";
        }
    }

    static class StatefulBlockInfo {
        final BlockInfoContiguousUnderConstruction storedBlock;
        final Block reportedBlock;
        final HdfsServerConstants.ReplicaState reportedState;

        StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock, Block reportedBlock, HdfsServerConstants.ReplicaState reportedState) {
            this.storedBlock = storedBlock;
            this.reportedBlock = reportedBlock;
            this.reportedState = reportedState;
        }
    }
}

