/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.hops.common.INodeUtil;
import io.hops.exception.StorageException;
import io.hops.exception.TransactionContextException;
import io.hops.exception.TransientStorageException;
import io.hops.leader_election.node.ActiveNode;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.HdfsVariables;
import io.hops.metadata.blockmanagement.ExcessReplicasMap;
import io.hops.metadata.common.FinderType;
import io.hops.metadata.common.entity.Variable;
import io.hops.metadata.hdfs.dal.BlockInfoDataAccess;
import io.hops.metadata.hdfs.dal.MisReplicatedRangeQueueDataAccess;
import io.hops.metadata.hdfs.entity.EncodingStatus;
import io.hops.metadata.hdfs.entity.HashBucket;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.metadata.hdfs.entity.MisReplicatedRange;
import io.hops.metadata.security.token.block.NameNodeBlockTokenSecretManager;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import io.hops.util.Slicer;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.BlocksMap;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.HashBuckets;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReplicationBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;
import org.apache.hadoop.hdfs.server.protocol.BlockReportBlockState;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.Bucket;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReportedBlock;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
public class BlockManager {
    static final Log LOG = LogFactory.getLog(BlockManager.class);
    public static final Log blockLog = NameNode.blockStateChangeLog;
    private final Namesystem namesystem;
    private final DatanodeManager datanodeManager;
    private final HeartbeatManager heartbeatManager;
    private final NameNodeBlockTokenSecretManager blockTokenSecretManager;
    private volatile long pendingReplicationBlocksCount = 0L;
    private volatile long corruptReplicaBlocksCount = 0L;
    private volatile long underReplicatedBlocksCount = 0L;
    private volatile long scheduledReplicationBlocksCount = 0L;
    private AtomicLong excessBlocksCount = new AtomicLong(0L);
    private AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
    private final long replicationRecheckInterval;
    final BlocksMap blocksMap;
    final Daemon replicationThread = new Daemon((Runnable)new ReplicationMonitor());
    final CorruptReplicasMap corruptReplicas;
    private final InvalidateBlocks invalidateBlocks;
    private final Set<Block> postponedMisreplicatedBlocks = Sets.newConcurrentHashSet();
    public final ExcessReplicasMap excessReplicateMap;
    public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
    @VisibleForTesting
    final PendingReplicationBlocks pendingReplications;
    public final short maxReplication;
    int maxReplicationStreams;
    int replicationStreamsHardLimit;
    public final short minReplication;
    public final int defaultReplication;
    final int maxCorruptFilesReturned;
    final float blocksInvalidateWorkPct;
    final int blocksReplWorkMultiplier;
    final boolean shouldCheckForEnoughRacks;
    final boolean encryptDataTransfer;
    private final long maxNumBlocksToLog;
    private Daemon replicationQueuesInitializer = null;
    private double replicationQueuesInitProgress = 0.0;
    private BlockPlacementPolicy blockplacement;
    private final BlockStoragePolicySuite storagePolicySuite;
    private boolean checkNSRunning = true;
    private final int processReportBatchSize;
    private final int processMisReplicatedBatchSize;
    private final int processMisReplicatedNoOfBatchs;
    private final int processMisReplicatedNoThreads;
    private final int removalBatchSize;
    private final int removalNoThreads;

    public long getPendingReplicationBlocksCount() {
        return this.pendingReplicationBlocksCount;
    }

    public long getUnderReplicatedBlocksCount() {
        return this.underReplicatedBlocksCount;
    }

    public long getCorruptReplicaBlocksCount() {
        return this.corruptReplicaBlocksCount;
    }

    public long getScheduledReplicationBlocksCount() {
        return this.scheduledReplicationBlocksCount;
    }

    public long getPendingDeletionBlocksCount() throws IOException {
        return this.invalidateBlocks.numBlocks();
    }

    public long getExcessBlocksCount() {
        return this.excessBlocksCount.get();
    }

    public long getPostponedMisreplicatedBlocksCount() {
        return this.postponedMisreplicatedBlocksCount.get();
    }

    public BlockManager(Namesystem namesystem, FSClusterStats stats, Configuration conf) throws IOException {
        this.namesystem = namesystem;
        int numBuckets = conf.getInt("dfs.blockreport.numbuckets", 1000);
        HashBuckets.initialize(numBuckets);
        this.datanodeManager = new DatanodeManager(this, namesystem, conf);
        this.corruptReplicas = new CorruptReplicasMap(this.datanodeManager);
        this.heartbeatManager = this.datanodeManager.getHeartbeatManager();
        this.invalidateBlocks = new InvalidateBlocks(this.datanodeManager);
        this.excessReplicateMap = new ExcessReplicasMap(this.datanodeManager);
        this.blocksMap = new BlocksMap(this.datanodeManager);
        this.blockplacement = BlockPlacementPolicy.getInstance(conf, stats, this.datanodeManager.getNetworkTopology(), this.datanodeManager.getHost2DatanodeMap());
        this.storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
        this.pendingReplications = new PendingReplicationBlocks((long)conf.getInt("dfs.namenode.replication.pending.timeout-sec", -1) * 1000L);
        this.blockTokenSecretManager = this.createBlockTokenSecretManager(conf);
        this.maxCorruptFilesReturned = conf.getInt("dfs.corruptfilesreturned.max", 500);
        this.defaultReplication = conf.getInt("dfs.replication", 3);
        int maxR = conf.getInt("dfs.replication.max", 512);
        int minR = conf.getInt("dfs.namenode.replication.min", 1);
        if (minR <= 0) {
            throw new IOException("Unexpected configuration parameters: dfs.namenode.replication.min = " + minR + " <= 0");
        }
        if (maxR > Short.MAX_VALUE) {
            throw new IOException("Unexpected configuration parameters: dfs.replication.max = " + maxR + " > " + Short.MAX_VALUE);
        }
        if (minR > maxR) {
            throw new IOException("Unexpected configuration parameters: dfs.namenode.replication.min = " + minR + " > " + "dfs.replication.max" + " = " + maxR);
        }
        this.minReplication = (short)minR;
        this.maxReplication = (short)maxR;
        this.maxReplicationStreams = conf.getInt("dfs.namenode.replication.max-streams", 2);
        this.replicationStreamsHardLimit = conf.getInt("dfs.namenode.replication.max-streams-hard-limit", 4);
        this.shouldCheckForEnoughRacks = conf.get("net.topology.script.file.name") != null;
        this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
        this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
        this.replicationRecheckInterval = (long)conf.getInt("dfs.namenode.replication.interval", 3) * 1000L;
        this.encryptDataTransfer = conf.getBoolean("dfs.encrypt.data.transfer", false);
        this.maxNumBlocksToLog = conf.getLong("dfs.namenode.max-num-blocks-to-log", 1000L);
        this.processReportBatchSize = conf.getInt("dfs.namenode.processReport.batchsize", 5000);
        this.processMisReplicatedBatchSize = conf.getInt("dfs.namenode.misreplicated.batchsize", 500);
        this.processMisReplicatedNoOfBatchs = conf.getInt("dfs.namenode.misreplicated.noofbatches", 100);
        this.processMisReplicatedNoThreads = conf.getInt("dfs.namenode.misreplicated.noofbatches", 20);
        this.removalBatchSize = conf.getInt("dfs.namenode.removal.batchsize", 500);
        this.removalNoThreads = conf.getInt("dfs.namenode.removal.noofbatches", 20);
        LOG.info((Object)("defaultReplication         = " + this.defaultReplication));
        LOG.info((Object)("maxReplication             = " + this.maxReplication));
        LOG.info((Object)("minReplication             = " + this.minReplication));
        LOG.info((Object)("maxReplicationStreams      = " + this.maxReplicationStreams));
        LOG.info((Object)("shouldCheckForEnoughRacks  = " + this.shouldCheckForEnoughRacks));
        LOG.info((Object)("replicationRecheckInterval = " + this.replicationRecheckInterval));
        LOG.info((Object)("encryptDataTransfer        = " + this.encryptDataTransfer));
        LOG.info((Object)("maxNumBlocksToLog          = " + this.maxNumBlocksToLog));
        LOG.info((Object)("misReplicatedBatchSize     = " + this.processMisReplicatedBatchSize));
        LOG.info((Object)("misReplicatedNoOfBatchs    = " + this.processMisReplicatedNoOfBatchs));
        LOG.info((Object)("misReplicatedNoOfThreads   = " + this.processMisReplicatedNoThreads));
        LOG.info((Object)("removalBatchSize           = " + this.removalBatchSize));
        LOG.info((Object)("removalNoOfThreads         = " + this.removalNoThreads));
    }

    private NameNodeBlockTokenSecretManager createBlockTokenSecretManager(Configuration conf) throws IOException {
        boolean isEnabled = conf.getBoolean("dfs.block.access.token.enable", false);
        LOG.info((Object)("dfs.block.access.token.enable=" + isEnabled));
        if (!isEnabled) {
            if (UserGroupInformation.isSecurityEnabled()) {
                LOG.error((Object)"Security is enabled but block access tokens (via dfs.block.access.token.enable) aren't enabled. This may cause issues when clients attempt to talk to a DataNode.");
            }
            return null;
        }
        long updateMin = conf.getLong("dfs.block.access.key.update.interval", 600L);
        long lifetimeMin = conf.getLong("dfs.block.access.token.lifetime", 600L);
        String encryptionAlgorithm = conf.get("dfs.encrypt.data.transfer.algorithm");
        LOG.info((Object)("dfs.block.access.key.update.interval=" + updateMin + " min(s), " + "dfs.block.access.token.lifetime" + "=" + lifetimeMin + " min(s), " + "dfs.encrypt.data.transfer.algorithm" + "=" + encryptionAlgorithm));
        return new NameNodeBlockTokenSecretManager(updateMin * 60L * 1000L, lifetimeMin * 60L * 1000L, null, encryptionAlgorithm, this.namesystem);
    }

    public BlockStoragePolicy getDefaultStoragePolicy() {
        return this.storagePolicySuite.getDefaultPolicy();
    }

    public BlockStoragePolicy getStoragePolicy(String policyName) {
        return this.storagePolicySuite.getPolicy(policyName);
    }

    public BlockStoragePolicy getStoragePolicy(byte policyId) {
        return this.storagePolicySuite.getPolicy(policyId);
    }

    public BlockStoragePolicy[] getStoragePolicies() {
        return this.storagePolicySuite.getAllPolicies();
    }

    public void setBlockPoolId(String blockPoolId) {
        if (this.isBlockTokenEnabled()) {
            this.blockTokenSecretManager.setBlockPoolId(blockPoolId);
        }
    }

    @VisibleForTesting
    public BlockTokenSecretManager getBlockTokenSecretManager() {
        return this.blockTokenSecretManager;
    }

    @VisibleForTesting
    void enableRMTerminationForTesting() {
        this.checkNSRunning = false;
    }

    private boolean isBlockTokenEnabled() {
        return this.blockTokenSecretManager != null;
    }

    boolean shouldUpdateBlockKey(long updateTime) throws IOException {
        return this.isBlockTokenEnabled() ? this.blockTokenSecretManager.updateKeys(updateTime) : false;
    }

    public void activate(Configuration conf) throws IOException {
        this.pendingReplications.start();
        this.datanodeManager.activate(conf);
        this.replicationThread.start();
        if (this.isBlockTokenEnabled()) {
            this.blockTokenSecretManager.generateKeysIfNeeded();
        }
    }

    public void close() {
        try {
            if (this.replicationThread != null) {
                this.replicationThread.interrupt();
                this.replicationThread.join(3000L);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.datanodeManager.close();
        this.pendingReplications.stop();
        this.blocksMap.close();
    }

    public DatanodeManager getDatanodeManager() {
        return this.datanodeManager;
    }

    @VisibleForTesting
    public BlockPlacementPolicy getBlockPlacementPolicy() {
        return this.blockplacement;
    }

    public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
        if (newpolicy == null) {
            throw new HadoopIllegalArgumentException("newpolicy == null");
        }
        this.blockplacement = newpolicy;
    }

    private void dumpBlockMeta(Block block, PrintWriter out) throws IOException {
        ArrayList<DatanodeDescriptor> containingNodes = new ArrayList<DatanodeDescriptor>();
        ArrayList<DatanodeStorageInfo> containingLiveReplicasNodes = new ArrayList<DatanodeStorageInfo>();
        NumberReplicas numReplicas = new NumberReplicas();
        this.chooseSourceDatanode(block, containingNodes, containingLiveReplicasNodes, numReplicas, 5);
        assert (containingLiveReplicasNodes.size() >= numReplicas.liveReplicas());
        int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas();
        if (block instanceof BlockInfo) {
            BlockCollection bc = ((BlockInfo)block).getBlockCollection();
            String fileName = bc == null ? "[orphaned]" : bc.getName();
            out.print(fileName + ": ");
        }
        out.print(block + (usableReplicas > 0 ? "" : " MISSING") + " (replicas: l: " + numReplicas.liveReplicas() + " d: " + numReplicas.decommissionedReplicas() + " c: " + numReplicas.corruptReplicas() + " e: " + numReplicas.excessReplicas() + ") ");
        Collection<DatanodeDescriptor> corruptNodes = this.corruptReplicas.getNodes(this.getBlockInfo(block));
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(block)) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            String state = "";
            if (corruptNodes != null && corruptNodes.contains(node)) {
                state = "(corrupt)";
            } else if (node.isDecommissioned() || node.isDecommissionInProgress()) {
                state = "(decommissioned)";
            }
            if (storage.areBlockContentsStale()) {
                state = state + " (block deletions maybe out of date)";
            }
            out.print(" " + node + state + " : ");
        }
        out.println("");
    }

    public int getMaxReplicationStreams() {
        return this.maxReplicationStreams;
    }

    public boolean checkMinReplication(Block block) throws IOException {
        return this.countNodes(block).liveReplicas() >= this.minReplication;
    }

    private static boolean commitBlock(BlockInfoUnderConstruction block, Block commitBlock, DatanodeManager datanodeMgr) throws IOException, StorageException {
        if (block.getBlockUCState() == HdfsServerConstants.BlockUCState.COMMITTED) {
            return false;
        }
        assert (block.getNumBytes() <= commitBlock.getNumBytes()) : "commitBlock length is less than the stored one " + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
        block.commitBlock(commitBlock, datanodeMgr);
        return true;
    }

    public boolean commitOrCompleteLastBlock(BlockCollection bc, Block commitBlock) throws IOException, StorageException {
        if (commitBlock == null) {
            return false;
        }
        BlockInfo lastBlock = bc.getLastBlock();
        if (lastBlock == null) {
            return false;
        }
        if (lastBlock.isComplete()) {
            return false;
        }
        boolean b = BlockManager.commitBlock((BlockInfoUnderConstruction)lastBlock, commitBlock, this.getDatanodeManager());
        LOG.debug((Object)("commitOrCompleteLastBlock for block " + lastBlock.getBlockId()));
        int numReplicas = this.countNodes(lastBlock).liveReplicas();
        if (numReplicas >= this.minReplication) {
            this.completeBlock(bc, lastBlock.getBlockIndex(), false);
            LOG.debug((Object)("commitOrCompleteLastBlock. Completed Block " + lastBlock.getBlockId()));
        } else {
            LOG.debug((Object)("commitOrCompleteLastBlock. Completed FAILED. Block " + lastBlock.getBlockId() + ": needed " + this.minReplication + " replicas, but only has " + numReplicas));
        }
        return b;
    }

    private BlockInfo completeBlock(BlockCollection bc, int blkIndex, boolean force) throws IOException, StorageException {
        if (blkIndex < 0) {
            return null;
        }
        BlockInfo curBlock = bc.getBlock(blkIndex);
        if (curBlock.isComplete()) {
            return curBlock;
        }
        BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
        int numNodes = ucBlock.numNodes(this.datanodeManager);
        if (!force && numNodes < this.minReplication) {
            throw new IOException("Cannot complete block: block does not satisfy minimal replication requirement.");
        }
        if (!force && ucBlock.getBlockUCState() != HdfsServerConstants.BlockUCState.COMMITTED) {
            throw new IOException("Cannot complete block: block has not been COMMITTED by the client");
        }
        BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
        bc.setBlock(blkIndex, completeBlock);
        this.namesystem.adjustSafeModeBlockTotals(0, 1);
        this.namesystem.incrementSafeBlockCount(curBlock);
        return completeBlock;
    }

    private BlockInfo completeBlock(BlockCollection bc, BlockInfo block, boolean force) throws IOException, StorageException {
        BlockInfo blk = bc.getBlock(block.getBlockIndex());
        if (blk == block) {
            return this.completeBlock(bc, blk.getBlockIndex(), force);
        }
        return block;
    }

    public BlockInfo forceCompleteBlock(BlockCollection bc, BlockInfoUnderConstruction block) throws IOException, StorageException {
        block.commitBlock(block, this.getDatanodeManager());
        return this.completeBlock(bc, block, true);
    }

    public LocatedBlock convertLastBlockToUnderConstruction(BlockCollection bc) throws IOException {
        BlockInfo oldBlock = bc.getLastBlock();
        if (oldBlock == null || bc.getPreferredBlockSize() == oldBlock.getNumBytes()) {
            return null;
        }
        assert (oldBlock == this.getStoredBlock(oldBlock)) : "last block of the file is not in blocksMap";
        DatanodeStorageInfo[] targets = this.getStorages(oldBlock);
        BlockInfoUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets);
        NumberReplicas replicas = this.countNodes(ucBlock);
        this.neededReplications.remove(ucBlock, replicas.liveReplicas(), replicas.decommissionedReplicas(), this.getReplication(ucBlock));
        this.pendingReplications.remove(ucBlock);
        for (DatanodeStorageInfo target : targets) {
            this.invalidateBlocks.remove(target, oldBlock);
        }
        this.namesystem.adjustSafeModeBlockTotals(targets.length >= this.minReplication ? -1 : 0, -1);
        long fileLength = bc.computeContentSummary().getLength();
        long pos = fileLength - ucBlock.getNumBytes();
        return this.createLocatedBlock(ucBlock, pos, BlockTokenSecretManager.AccessMode.WRITE);
    }

    private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) throws StorageException, TransactionContextException {
        ArrayList<DatanodeStorageInfo> storageSet = new ArrayList<DatanodeStorageInfo>();
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(block)) {
            if (this.invalidateBlocks.contains(storage, block)) continue;
            storageSet.add(storage);
        }
        return storageSet;
    }

    private List<LocatedBlock> createLocatedBlockList(BlockInfo[] blocks, long offset, long length, int nrBlocksToReturn, BlockTokenSecretManager.AccessMode mode) throws IOException, StorageException {
        int curBlk = 0;
        long curPos = 0L;
        long blkSize = 0L;
        int nrBlocks = blocks[0].getNumBytes() == 0L ? 0 : blocks.length;
        for (curBlk = 0; curBlk < nrBlocks; ++curBlk) {
            blkSize = blocks[curBlk].getNumBytes();
            assert (blkSize > 0L) : "Block of size 0";
            if (curPos + blkSize > offset) break;
            curPos += blkSize;
        }
        if (nrBlocks > 0 && curBlk == nrBlocks) {
            return Collections.emptyList();
        }
        long endOff = offset + length;
        ArrayList<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
        do {
            results.add(this.createLocatedBlock(blocks[curBlk], curPos, mode));
        } while ((curPos += blocks[++curBlk].getNumBytes()) < endOff && curBlk < blocks.length && results.size() < nrBlocksToReturn);
        return results;
    }

    private LocatedBlock createLocatedBlock(BlockInfo[] blocks, long endPos, BlockTokenSecretManager.AccessMode mode) throws IOException {
        long blkSize;
        int curBlk = 0;
        long curPos = 0L;
        int nrBlocks = blocks[0].getNumBytes() == 0L ? 0 : blocks.length;
        for (curBlk = 0; curBlk < nrBlocks && curPos + (blkSize = blocks[curBlk].getNumBytes()) < endPos; ++curBlk) {
            curPos += blkSize;
        }
        return this.createLocatedBlock(blocks[curBlk], curPos, mode);
    }

    private List<LocatedBlock> createPhantomLocatedBlockList(INodeFile file, byte[] data, BlockTokenSecretManager.AccessMode mode) throws IOException, StorageException {
        ArrayList<LocatedBlock> results = new ArrayList<LocatedBlock>(1);
        BlockInfo fakeBlk = new BlockInfo();
        fakeBlk.setBlockIdNoPersistance(-file.getId());
        fakeBlk.setINodeIdNoPersistance(-file.getId());
        fakeBlk.setBlockIndexNoPersistance(0);
        fakeBlk.setNumBytesNoPersistance(file.getSize());
        fakeBlk.setTimestampNoPersistance(file.getModificationTime());
        ExtendedBlock eb = new ExtendedBlock(this.namesystem.getBlockPoolId(), fakeBlk);
        DatanodeDescriptor randomDatanode = this.datanodeManager.getRandomDN();
        DatanodeInfo[] machines = new DatanodeInfo[1];
        if (randomDatanode != null) {
            machines[0] = randomDatanode;
        } else {
            DatanodeInfo phantomDatanode;
            DatanodeID phantomDatanodID = new DatanodeID(this.namesystem.getNameNode().getServiceRpcAddress().getAddress().getHostAddress(), this.namesystem.getNameNode().getServiceRpcAddress().getAddress().getCanonicalHostName(), this.namesystem.getBlockPoolId(), 50010, 50075, 50475, 50020);
            machines[0] = phantomDatanode = new DatanodeInfo(phantomDatanodID);
        }
        LocatedBlock locatedBlock = new LocatedBlock(eb, machines, 0L, false);
        locatedBlock.setData(data);
        results.add(locatedBlock);
        return results;
    }

    private LocatedBlock createLocatedBlock(BlockInfo blk, long pos, BlockTokenSecretManager.AccessMode mode) throws IOException, StorageException {
        LocatedBlock lb = this.createLocatedBlock(blk, pos);
        if (mode != null) {
            this.setBlockToken(lb, mode);
        }
        return lb;
    }

    private LocatedBlock createLocatedBlock(BlockInfo blk, long pos) throws IOException, StorageException {
        int numNodes;
        int numCorruptReplicas;
        if (blk instanceof BlockInfoUnderConstruction) {
            if (blk.isComplete()) {
                throw new IOException("blk instanceof BlockInfoUnderConstruction && blk.isComplete(), blk=" + blk);
            }
            BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
            DatanodeStorageInfo[] locations = uc.getExpectedStorageLocations(this.datanodeManager);
            ExtendedBlock eb = new ExtendedBlock(this.namesystem.getBlockPoolId(), blk);
            return new LocatedBlock(eb, locations, pos, false);
        }
        int numCorruptNodes = this.countNodes(blk).corruptReplicas();
        if (numCorruptNodes != (numCorruptReplicas = this.corruptReplicas.numCorruptReplicas(blk))) {
            LOG.warn((Object)("Inconsistent number of corrupt replicas for " + blk + " blockMap has " + numCorruptNodes + " but corrupt replicas map has " + numCorruptReplicas));
        }
        boolean isCorrupt = numCorruptNodes == (numNodes = this.blocksMap.numNodes(blk));
        int numMachines = isCorrupt ? numNodes : numNodes - numCorruptNodes;
        DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numMachines];
        int j = 0;
        if (numMachines > 0) {
            for (DatanodeStorageInfo storage : this.blocksMap.storageList(blk)) {
                boolean replicaCorrupt = this.corruptReplicas.isReplicaCorrupt(blk, storage.getDatanodeDescriptor());
                if (!isCorrupt && (isCorrupt || replicaCorrupt)) continue;
                storages[j++] = storage;
            }
        }
        assert (j == storages.length) : "isCorrupt: " + isCorrupt + " numStorages: " + numMachines + " numNodes: " + numNodes + " numCorrupt: " + numCorruptNodes + " numCorruptRepls: " + numCorruptReplicas;
        ExtendedBlock eb = new ExtendedBlock(this.namesystem.getBlockPoolId(), blk);
        return new LocatedBlock(eb, storages, pos, isCorrupt);
    }

    public LocatedBlocks createPhantomLocatedBlocks(INodeFile file, byte[] data, boolean isFileUnderConstruction, boolean needBlockToken) throws IOException, StorageException {
        if (needBlockToken) {
            new IOException("Block Tokens are not currently supported for files stored in the database");
        }
        BlockTokenSecretManager.AccessMode mode = needBlockToken ? BlockTokenSecretManager.AccessMode.READ : null;
        List<LocatedBlock> locatedblocks = this.createPhantomLocatedBlockList(file, data, mode);
        return new LocatedBlocks(file.getSize(), isFileUnderConstruction, locatedblocks, null, false);
    }

    public LocatedBlocks createLocatedBlocks(BlockInfo[] blocks, long fileSizeExcludeBlocksUnderConstruction, boolean isFileUnderConstruction, long offset, long length, boolean needBlockToken) throws IOException, StorageException {
        if (blocks == null) {
            return null;
        }
        if (blocks.length == 0) {
            return new LocatedBlocks(0L, isFileUnderConstruction, Collections.emptyList(), null, false);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("blocks = " + Arrays.asList(blocks)));
        }
        BlockTokenSecretManager.AccessMode mode = needBlockToken ? BlockTokenSecretManager.AccessMode.READ : null;
        List<LocatedBlock> locatedblocks = this.createLocatedBlockList(blocks, offset, length, Integer.MAX_VALUE, mode);
        BlockInfo last = blocks[blocks.length - 1];
        long lastPos = last.isComplete() ? fileSizeExcludeBlocksUnderConstruction - last.getNumBytes() : fileSizeExcludeBlocksUnderConstruction;
        LocatedBlock lastlb = this.createLocatedBlock(last, lastPos, mode);
        boolean isComplete = last.isComplete();
        return new LocatedBlocks(fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction, locatedblocks, lastlb, isComplete);
    }

    public ExportedBlockKeys getBlockKeys() throws IOException {
        return this.isBlockTokenEnabled() ? this.blockTokenSecretManager.exportKeys() : ExportedBlockKeys.DUMMY_KEYS;
    }

    public void setBlockToken(LocatedBlock b, BlockTokenSecretManager.AccessMode mode) throws IOException {
        if (this.isBlockTokenEnabled()) {
            b.setBlockToken(this.blockTokenSecretManager.generateToken(NameNode.getRemoteUser().getShortUserName(), b.getBlock(), EnumSet.of(mode)));
        }
    }

    void addKeyUpdateCommand(List<DatanodeCommand> cmds, DatanodeDescriptor nodeinfo) throws IOException {
        if (this.isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
            cmds.add(new KeyUpdateCommand(this.blockTokenSecretManager.exportKeys()));
            nodeinfo.needKeyUpdate = false;
        }
    }

    public DataEncryptionKey generateDataEncryptionKey() throws IOException {
        if (this.isBlockTokenEnabled() && this.encryptDataTransfer) {
            return this.blockTokenSecretManager.generateDataEncryptionKey();
        }
        return null;
    }

    public short adjustReplication(short replication) {
        return replication < this.minReplication ? this.minReplication : (replication > this.maxReplication ? this.maxReplication : replication);
    }

    public void verifyReplication(String src, short replication, String clientName) throws IOException {
        if (replication >= this.minReplication && replication <= this.maxReplication) {
            return;
        }
        String text = "file " + src + (clientName != null ? " on client " + clientName : "") + ".\nRequested replication " + replication;
        if (replication > this.maxReplication) {
            throw new IOException(text + " exceeds maximum " + this.maxReplication);
        }
        if (replication < this.minReplication) {
            throw new IOException(text + " is less than the required minimum " + this.minReplication);
        }
    }

    public boolean isSufficientlyReplicated(BlockInfo b) throws IOException {
        int replication = Math.min(this.minReplication, this.getDatanodeManager().getNumLiveDataNodes());
        return this.countLiveNodes(b) >= replication;
    }

    public BlocksWithLocations getBlocks(DatanodeID datanode, long size) throws IOException {
        this.namesystem.checkSuperuserPrivilege();
        return this.getBlocksWithLocations(datanode, size);
    }

    private BlocksWithLocations getBlocksWithLocations(DatanodeID datanode, long size) throws UnregisteredNodeException, IOException {
        BlockInfo curBlock;
        DatanodeDescriptor node = this.getDatanodeManager().getDatanode(datanode);
        if (node == null) {
            blockLog.warn((Object)("BLOCK* getBlocks: Asking for blocks from an unrecorded node " + datanode));
            throw new HadoopIllegalArgumentException("Datanode " + datanode + " not found.");
        }
        int numBlocks = node.numBlocks();
        if (numBlocks == 0) {
            return new BlocksWithLocations(new BlocksWithLocations.BlockWithLocations[0]);
        }
        Iterator<BlockInfo> iter = node.getBlockIterator();
        int startBlock = DFSUtil.getRandom().nextInt(numBlocks);
        for (int i = 0; i < startBlock; ++i) {
            iter.next();
        }
        ArrayList<BlocksWithLocations.BlockWithLocations> results = new ArrayList<BlocksWithLocations.BlockWithLocations>();
        long totalSize = 0L;
        while (totalSize < size && iter.hasNext()) {
            curBlock = iter.next();
            if (!curBlock.isComplete()) continue;
            totalSize += this.addBlock(curBlock, results);
        }
        if (totalSize < size) {
            iter = node.getBlockIterator();
            for (int i = 0; i < startBlock && totalSize < size; ++i) {
                curBlock = iter.next();
                if (!curBlock.isComplete()) continue;
                totalSize += this.addBlock(curBlock, results);
            }
        }
        return new BlocksWithLocations(results.toArray(new BlocksWithLocations.BlockWithLocations[results.size()]));
    }

    void datanodeRemoved(DatanodeDescriptor node) throws IOException {
        Iterator<BlockInfo> it = node.getBlockIterator();
        ArrayList<Long> allBlockIds = new ArrayList<Long>();
        while (it.hasNext()) {
            allBlockIds.add(((Block)it.next()).getBlockId());
        }
        this.removeBlocks(allBlockIds, node);
        node.resetBlocks();
        List<Integer> sids = this.datanodeManager.getSidsOnDatanode(node.getDatanodeUuid());
        this.invalidateBlocks.remove(sids);
        boolean stale = false;
        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
            if (!storage.areBlockContentsStale()) continue;
            stale = true;
            break;
        }
        if (stale) {
            this.rescanPostponedMisreplicatedBlocks();
        }
    }

    void removeBlocksAssociatedTo(DatanodeStorageInfo storageInfo) throws IOException {
        Iterator<BlockInfo> it = storageInfo.getBlockIterator();
        DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
        ArrayList<Long> allBlockIds = new ArrayList<Long>();
        while (it.hasNext()) {
            allBlockIds.add(it.next().getBlockId());
        }
        this.removeBlocks(allBlockIds, node);
        this.invalidateBlocks.remove(storageInfo.getSid());
        this.namesystem.checkSafeMode();
    }

    void removeBlocksAssociatedTo(int sid) throws IOException {
        Iterator<BlockInfo> it = this.getAllStorageBlockInfos(sid).iterator();
        ArrayList<Long> allBlockIds = new ArrayList<Long>();
        while (it.hasNext()) {
            allBlockIds.add(it.next().getBlockId());
        }
        this.removeBlocks(allBlockIds, sid);
        this.invalidateBlocks.remove(sid);
        this.namesystem.checkSafeMode();
    }

    private List<BlockInfo> getAllStorageBlockInfos(final int sid) throws IOException {
        LightWeightRequestHandler findBlocksHandler = new LightWeightRequestHandler(HDFSOperationType.GET_ALL_STORAGE_IDS){

            public Object performTask() throws StorageException, IOException {
                BlockInfoDataAccess da = (BlockInfoDataAccess)HdfsStorageFactory.getDataAccess(BlockInfoDataAccess.class);
                HdfsStorageFactory.getConnector().beginTransaction();
                List list = da.findBlockInfosByStorageId(sid);
                HdfsStorageFactory.getConnector().commit();
                return list;
            }
        };
        return (List)findBlocksHandler.handle();
    }

    void addToInvalidates(Block block, DatanodeInfo datanode) throws StorageException, TransactionContextException, UnregisteredNodeException {
        DatanodeDescriptor dn = this.datanodeManager.getDatanode(datanode);
        DatanodeStorageInfo storage = this.getBlockInfo(block).getStorageOnNode(dn);
        if (storage != null) {
            this.addToInvalidates(block, storage);
        }
    }

    void addToInvalidates(Block block, DatanodeStorageInfo storage) throws TransactionContextException, StorageException {
        BlockInfo temp = this.getBlockInfo(block);
        this.invalidateBlocks.add(temp, storage, true);
    }

    private void addToInvalidates(Block b) throws StorageException, TransactionContextException {
        DatanodeStorageInfo[] storages;
        StringBuilder datanodes = new StringBuilder();
        BlockInfo block = this.getBlockInfo(b);
        for (DatanodeStorageInfo storage : storages = this.getBlockInfo(block).getStorages(this.datanodeManager, DatanodeStorage.State.NORMAL)) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            this.invalidateBlocks.add(block, storage, false);
            datanodes.append(node).append(" ");
        }
        if (datanodes.length() != 0) {
            blockLog.info((Object)("BLOCK* addToInvalidates: " + block + " " + datanodes));
        }
    }

    public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, DatanodeInfo dn, String storageID, final String reason) throws IOException {
        final DatanodeDescriptor node = this.getDatanodeManager().getDatanode(dn);
        final DatanodeStorageInfo storage = storageID == null ? null : node.getStorageInfo(storageID);
        new HopsTransactionalRequestHandler(HDFSOperationType.FIND_AND_MARK_BLOCKS_AS_CORRUPT){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                Block b = blk.getLocalBlock();
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(blk.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.UC, LockFactory.BLK.IV));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws StorageException, IOException {
                BlockInfo storedBlock = BlockManager.this.getStoredBlock(blk.getLocalBlock());
                if (storedBlock == null) {
                    blockLog.info((Object)("BLOCK* findAndMarkBlockAsCorrupt: " + blk + " not found"));
                    return null;
                }
                BlockToMarkCorrupt b = new BlockToMarkCorrupt(storedBlock, blk.getGenerationStamp(), reason, CorruptReplicasMap.Reason.CORRUPTION_REPORTED);
                BlockManager.this.markBlockAsCorrupt(b, storage, node);
                return null;
            }
        }.handle(this.namesystem);
    }

    private void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException, StorageException {
        boolean corruptedDuringWrite;
        BlockCollection bc = b.corrupted.getBlockCollection();
        if (bc == null) {
            blockLog.info((Object)("BLOCK markBlockAsCorrupt: " + b + " cannot be marked as corrupt as it does not belong to any file"));
            this.addToInvalidates((Block)b.corrupted, node);
            return;
        }
        if (storageInfo == null) {
            storageInfo = b.corrupted.getStorageOnNode(node);
        }
        if (storageInfo != null) {
            storageInfo.addBlock(b.stored);
        }
        this.corruptReplicas.addToCorruptReplicasMap(b.corrupted, storageInfo, b.reason, b.reasonCode);
        NumberReplicas numberOfReplicas = this.countNodes(b.stored);
        boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc.getBlockReplication();
        boolean minReplicationSatisfied = numberOfReplicas.liveReplicas() >= this.minReplication;
        boolean hasMoreCorruptReplicas = minReplicationSatisfied && numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas() > bc.getBlockReplication();
        boolean bl = corruptedDuringWrite = minReplicationSatisfied && b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp();
        if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) {
            this.invalidateBlock(b, node);
        } else if (this.namesystem.isPopulatingReplQueues()) {
            this.updateNeededReplications(b.stored, -1, 0);
        }
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        if (!fsNamesystem.isErasureCodingEnabled()) {
            return;
        }
        if (numberOfReplicas.liveReplicas() == 0) {
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{bc.getId()});
            if (status != null) {
                if (!status.isCorrupt()) {
                    status.setStatus(EncodingStatus.Status.REPAIR_REQUESTED);
                    status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                }
                status.setLostBlocks(Integer.valueOf(status.getLostBlocks() + 1));
                EntityManager.update((Object)status);
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{bc.getId()});
                if (status != null) {
                    if (!status.isParityCorrupt()) {
                        status.setParityStatus(EncodingStatus.ParityStatus.REPAIR_REQUESTED);
                        status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    status.setLostParityBlocks(Integer.valueOf(status.getLostParityBlocks() + 1));
                    EntityManager.update((Object)status);
                    LOG.info((Object)"markBlockAsCorrupt updated parity status to repair requested");
                }
            }
        }
    }

    private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn) throws IOException {
        blockLog.info((Object)("BLOCK* invalidateBlock: " + b + " on " + dn));
        DatanodeDescriptor node = this.getDatanodeManager().getDatanode(dn);
        if (node == null) {
            throw new IOException("Cannot invalidate " + b + " because datanode " + dn + " does not exist.");
        }
        NumberReplicas nr = this.countNodes(b.stored);
        if (nr.replicasOnStaleNodes() > 0) {
            blockLog.info((Object)("BLOCK* invalidateBlocks: postponing invalidation of " + b + " on " + dn + " because " + nr.replicasOnStaleNodes() + " replica(s) are located on nodes with potentially out-of-date block reports"));
            this.postponeBlock(b.corrupted);
            return false;
        }
        if (nr.liveReplicas() >= 1) {
            this.addToInvalidates((Block)b.corrupted, dn);
            this.removeStoredBlock((Block)b.stored, node);
            if (blockLog.isDebugEnabled()) {
                blockLog.debug((Object)("BLOCK* invalidateBlocks: " + b + " on " + dn + " listed for deletion."));
            }
            return true;
        }
        blockLog.info((Object)("BLOCK* invalidateBlocks: " + b + " on " + dn + " is the only copy and was not deleted"));
        return false;
    }

    private void postponeBlock(Block blk) {
        if (this.postponedMisreplicatedBlocks.add(blk)) {
            this.postponedMisreplicatedBlocksCount.incrementAndGet();
        }
    }

    void updateState() throws IOException {
        this.pendingReplicationBlocksCount = this.pendingReplications.size();
        this.underReplicatedBlocksCount = this.neededReplications.size();
        this.corruptReplicaBlocksCount = this.corruptReplicas.size();
    }

    public int getUnderReplicatedNotMissingBlocks() throws IOException {
        return this.neededReplications.getUnderReplicatedBlockCount();
    }

    int computeInvalidateWork(int nodesToProcess) throws IOException {
        List<DatanodeInfo> nodes = this.invalidateBlocks.getDatanodes(this.datanodeManager);
        Collections.shuffle(nodes);
        nodesToProcess = Math.min(nodes.size(), nodesToProcess);
        int blockCnt = 0;
        for (DatanodeInfo dnInfo : nodes) {
            assert (dnInfo != null);
            int blocks = this.invalidateWorkForOneNode(dnInfo);
            if (blocks <= 0) continue;
            blockCnt += blocks;
            if (--nodesToProcess != 0) continue;
            break;
        }
        return blockCnt;
    }

    int computeReplicationWork(int blocksToProcess) throws IOException {
        List<List<Block>> blocksToReplicate = this.neededReplications.chooseUnderReplicatedBlocks(blocksToProcess);
        return this.computeReplicationWorkForBlocks(blocksToReplicate);
    }

    @VisibleForTesting
    int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) throws IOException {
        int scheduledWork = 0;
        for (int priority = 0; priority < blocksToReplicate.size(); ++priority) {
            for (Block block : blocksToReplicate.get(priority)) {
                scheduledWork += this.computeReplicationWorkForBlock(block, priority);
            }
        }
        return scheduledWork;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int computeReplicationWorkForBlockInternal(Block blk, int priority1) throws StorageException, IOException {
        DatanodeStorageInfo[] targets;
        int numEffectiveReplicas;
        short requiredReplication;
        BlockCollection bc = null;
        int scheduledWork = 0;
        LinkedList<ReplicationWork> work = new LinkedList<ReplicationWork>();
        UnderReplicatedBlocks underReplicatedBlocks = this.neededReplications;
        synchronized (underReplicatedBlocks) {
            bc = this.blocksMap.getBlockCollection(blk);
            if (bc == null || bc.isUnderConstruction() && this.getBlockInfo(blk).equals(bc.getLastBlock())) {
                this.neededReplications.remove(this.getBlockInfo(blk), priority1);
                this.neededReplications.decrementReplicationIndex(priority1);
                return scheduledWork;
            }
            requiredReplication = bc.getBlockReplication();
            ArrayList<DatanodeDescriptor> containingNodes = new ArrayList<DatanodeDescriptor>();
            ArrayList<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
            NumberReplicas numReplicas = new NumberReplicas();
            DatanodeDescriptor srcNode = this.chooseSourceDatanode(blk, containingNodes, liveReplicaNodes, numReplicas, priority1);
            if (srcNode == null) {
                LOG.debug((Object)("Block " + blk + " cannot be repl from any storage"));
                return scheduledWork;
            }
            assert (liveReplicaNodes.size() >= numReplicas.liveReplicas());
            numEffectiveReplicas = numReplicas.liveReplicas() + this.pendingReplications.getNumReplicas(this.getBlockInfo(blk));
            if (numEffectiveReplicas >= requiredReplication && (this.pendingReplications.getNumReplicas(this.getBlockInfo(blk)) > 0 || this.blockHasEnoughRacks(blk))) {
                this.neededReplications.remove(this.getBlockInfo(blk), priority1);
                this.neededReplications.decrementReplicationIndex(priority1);
                blockLog.info((Object)("BLOCK* Removing " + blk + " from neededReplications as it has enough replicas"));
                return scheduledWork;
            }
            int additionalReplRequired = numReplicas.liveReplicas() < requiredReplication ? requiredReplication - numEffectiveReplicas : 1;
            work.add(new ReplicationWork(blk, bc, srcNode, containingNodes, liveReplicaNodes, additionalReplRequired, priority1));
        }
        HashSet<DatanodeDescriptor> excludedNodes = new HashSet<DatanodeDescriptor>();
        for (ReplicationWork rw : work) {
            excludedNodes.clear();
            for (Object dn : rw.containingNodes) {
                excludedNodes.add((DatanodeDescriptor)dn);
            }
            rw.chooseTargets(this.blockplacement, this.storagePolicySuite, excludedNodes);
        }
        for (ReplicationWork rw : work) {
            Object dn;
            targets = rw.targets;
            if (targets == null || targets.length == 0) {
                ReplicationWork.access$502(rw, null);
                continue;
            }
            dn = this.neededReplications;
            synchronized (dn) {
                Block block = rw.block;
                int priority = rw.priority;
                bc = this.blocksMap.getBlockCollection(block);
                if (bc == null || bc instanceof MutableBlockCollection && this.getBlockInfo(blk).equals(bc.getLastBlock())) {
                    this.neededReplications.remove(this.getBlockInfo(block), priority);
                    ReplicationWork.access$502(rw, null);
                    this.neededReplications.decrementReplicationIndex(priority);
                    continue;
                }
                requiredReplication = bc.getBlockReplication();
                NumberReplicas numReplicas = this.countNodes(block);
                numEffectiveReplicas = numReplicas.liveReplicas() + this.pendingReplications.getNumReplicas(this.getBlockInfo(block));
                if (numEffectiveReplicas >= requiredReplication && (this.pendingReplications.getNumReplicas(this.getBlockInfo(block)) > 0 || this.blockHasEnoughRacks(block))) {
                    this.neededReplications.remove(this.getBlockInfo(block), priority);
                    this.neededReplications.decrementReplicationIndex(priority);
                    ReplicationWork.access$502(rw, null);
                    blockLog.info((Object)("BLOCK* Removing " + block + " from neededReplications as it has enough replicas"));
                    continue;
                }
                if (numReplicas.liveReplicas() >= requiredReplication && !this.blockHasEnoughRacks(block) && rw.srcNode.getNetworkLocation().equals(targets[0].getDatanodeDescriptor().getNetworkLocation())) {
                    continue;
                }
                rw.srcNode.addBlockToBeReplicated(block, targets);
                ++scheduledWork;
                DatanodeStorageInfo.incrementBlocksScheduled(targets);
                this.pendingReplications.increment(this.getBlockInfo(block), DatanodeStorageInfo.toDatanodeDescriptors(targets));
                if (blockLog.isDebugEnabled()) {
                    blockLog.debug((Object)("BLOCK* block " + block + " is moved from neededReplications to pendingReplications"));
                }
                if (numEffectiveReplicas + targets.length >= requiredReplication) {
                    this.neededReplications.remove(this.getBlockInfo(block), priority);
                    this.neededReplications.decrementReplicationIndex(priority);
                }
            }
        }
        if (blockLog.isInfoEnabled()) {
            for (ReplicationWork rw : work) {
                targets = rw.targets;
                if (targets == null || targets.length == 0) continue;
                StringBuilder targetList = new StringBuilder("datanode(s)");
                for (DatanodeStorageInfo target : targets) {
                    targetList.append(' ');
                    targetList.append(target);
                }
                blockLog.info((Object)("BLOCK* ask " + rw.srcNode + " to replicate " + rw.block + " to " + targetList));
            }
        }
        if (blockLog.isDebugEnabled()) {
            blockLog.debug((Object)("BLOCK* neededReplications = " + this.neededReplications.size() + " pendingReplications = " + this.pendingReplications.size()));
        }
        return scheduledWork;
    }

    public DatanodeStorageInfo[] chooseTarget4NewBlock(String src, int numOfReplicas, Node client, Set<Node> excludedNodes, long blocksize, List<String> favoredNodes, byte storagePolicyID) throws IOException {
        BlockStoragePolicy storagePolicy;
        List<DatanodeDescriptor> favoredDatanodeDescriptors = this.getDatanodeDescriptors(favoredNodes);
        DatanodeStorageInfo[] targets = this.blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, favoredDatanodeDescriptors, storagePolicy = this.storagePolicySuite.getPolicy(storagePolicyID));
        if (targets.length < this.minReplication) {
            throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" + this.minReplication + ").  There are " + this.getDatanodeManager().getNetworkTopology().getNumOfLeaves() + " datanode(s) running and " + (excludedNodes == null ? "no" : Integer.valueOf(excludedNodes.size())) + " node(s) are excluded in this operation. " + (excludedNodes != null ? Arrays.toString(excludedNodes.toArray(new Node[excludedNodes.size()])) : "[]"));
        }
        return targets;
    }

    public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
        return this.blockplacement.chooseTarget(src, 1, clientnode, Collections.emptyList(), false, excludes, blocksize, this.storagePolicySuite.getDefaultPolicy());
    }

    public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src, int numAdditionalNodes, Node clientnode, List<DatanodeStorageInfo> chosen, Set<Node> excludes, long blocksize, byte storagePolicyID) {
        BlockStoragePolicy storagePolicy = this.storagePolicySuite.getPolicy(storagePolicyID);
        return this.blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, chosen, true, excludes, blocksize, storagePolicy);
    }

    public DatanodeStorageInfo[] chooseTarget4ParityRepair(String src, int numOfReplicas, DatanodeDescriptor clientnode, List<DatanodeStorageInfo> chosen, Set<Node> excludes, long blocksize, byte storagePolicyID) {
        BlockStoragePolicy storagePolicy = this.storagePolicySuite.getPolicy(storagePolicyID);
        return this.blockplacement.chooseTarget(src, numOfReplicas, clientnode, chosen, false, excludes, blocksize, storagePolicy);
    }

    List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
        ArrayList<DatanodeDescriptor> datanodeDescriptors = null;
        if (nodes != null) {
            datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
            for (int i = 0; i < nodes.size(); ++i) {
                DatanodeDescriptor node = this.datanodeManager.getDatanodeDescriptor(nodes.get(i));
                if (node == null) continue;
                datanodeDescriptors.add(node);
            }
        }
        return datanodeDescriptors;
    }

    @VisibleForTesting
    DatanodeDescriptor chooseSourceDatanode(Block b, List<DatanodeDescriptor> containingNodes, List<DatanodeStorageInfo> nodesContainingLiveReplicas, NumberReplicas numReplicas, int priority) throws IOException {
        containingNodes.clear();
        nodesContainingLiveReplicas.clear();
        DatanodeDescriptor srcNode = null;
        int live = 0;
        int decommissioned = 0;
        int corrupt = 0;
        int excess = 0;
        BlockInfo block = this.getBlockInfo(b);
        Collection<DatanodeDescriptor> nodesCorrupt = this.corruptReplicas.getNodes(block);
        for (DatanodeStorageInfo storage : block.getStorages(this.datanodeManager)) {
            int countableReplica;
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            int n = countableReplica = storage.getState() == DatanodeStorage.State.NORMAL ? 1 : 0;
            if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
                corrupt += countableReplica;
            } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
                decommissioned += countableReplica;
            } else if (this.excessReplicateMap.contains(storage, block)) {
                excess += countableReplica;
            } else {
                nodesContainingLiveReplicas.add(storage);
                live += countableReplica;
            }
            if (!containingNodes.contains(node)) {
                containingNodes.add(node);
            }
            if (nodesCorrupt != null && nodesCorrupt.contains(node) || priority != 0 && !node.isDecommissionInProgress() && node.getNumberOfBlocksToBeReplicated() >= this.maxReplicationStreams || node.getNumberOfBlocksToBeReplicated() >= this.replicationStreamsHardLimit || this.excessReplicateMap.contains(storage, block) || node.isDecommissioned()) continue;
            if (node.isDecommissionInProgress() || srcNode == null) {
                srcNode = node;
                continue;
            }
            if (srcNode.isDecommissionInProgress()) continue;
            if (srcNode == null) {
                srcNode = node;
                continue;
            }
            if (!DFSUtil.getRandom().nextBoolean()) continue;
            srcNode = node;
        }
        if (numReplicas != null) {
            numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
        }
        return srcNode;
    }

    @VisibleForTesting
    void processPendingReplications() throws IOException {
        long[] timedOutItems = this.pendingReplications.getTimedOutBlocks();
        if (timedOutItems != null) {
            for (long timedOutItem : timedOutItems) {
                this.processTimedOutPendingBlock(timedOutItem);
            }
        }
    }

    public boolean processReport(DatanodeID nodeID, DatanodeStorage storage, BlockReport newReport) throws IOException {
        long startTime = Time.now();
        DatanodeDescriptor node = this.datanodeManager.getDatanode(nodeID);
        if (node == null || !node.isAlive) {
            throw new IOException("ProcessReport from dead or unregistered node: " + nodeID);
        }
        DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
        if (storageInfo == null) {
            storageInfo = node.updateStorage(storage);
        }
        if (this.namesystem.isInStartupSafeMode() && storageInfo.getBlockReportCount() > 0) {
            blockLog.info((Object)("BLOCK* processReport: discarded non-initial block report from " + nodeID + " because namenode still in startup phase"));
            return !node.hasStaleStorages();
        }
        ReportStatistics reportStatistics = null;
        try {
            reportStatistics = this.processReport(storageInfo, newReport);
            boolean staleBefore = storageInfo.areBlockContentsStale();
            storageInfo.receivedBlockReport();
            if (staleBefore && !storageInfo.areBlockContentsStale()) {
                LOG.info((Object)("BLOCK* processReport: Received first block report from " + node + " after becoming active. Its block contents are no longer considered stale"));
                this.rescanPostponedMisreplicatedBlocks();
            }
            long endTime = Time.now();
            NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
            if (metrics != null) {
                metrics.addBlockReport((int)(endTime - startTime));
            }
            blockLog.info((Object)("BLOCK* processReport success: from " + nodeID + " storage: " + storage + ", blocks: " + newReport.getNumberOfBlocks() + ", processing time: " + (endTime - startTime) + " ms. " + reportStatistics));
            return !node.hasStaleStorages();
        }
        catch (Throwable t) {
            long endTime = Time.now();
            blockLog.error((Object)("BLOCK* processReport fail: from " + nodeID + " storage: " + storage + ", blocks: " + newReport.getNumberOfBlocks() + ", processing time: " + (endTime - startTime) + " ms. " + reportStatistics), t);
            throw t;
        }
    }

    private void rescanPostponedMisreplicatedBlocks() throws IOException {
        HopsTransactionalRequestHandler rescanPostponedMisreplicatedBlocksHandler = new HopsTransactionalRequestHandler(HDFSOperationType.RESCAN_MISREPLICATED_BLOCKS){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                Block b = (Block)this.getParams()[0];
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                Block b = (Block)this.getParams()[0];
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(b.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
            }

            public Object performTask() throws IOException {
                Block b = (Block)this.getParams()[0];
                Set toRemoveSet = (Set)this.getParams()[1];
                BlockInfo bi = BlockManager.this.blocksMap.getStoredBlock(b);
                if (bi == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("BLOCK* rescanPostponedMisreplicatedBlocks: Postponed mis-replicated block " + b + " no longer found in block map."));
                    }
                    toRemoveSet.add(b);
                    BlockManager.this.postponedMisreplicatedBlocksCount.decrementAndGet();
                    return null;
                }
                MisReplicationResult res = BlockManager.this.processMisReplicatedBlock(bi);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("BLOCK* rescanPostponedMisreplicatedBlocks: Re-scanned block " + b + ", result is " + (Object)((Object)res)));
                }
                if (res != MisReplicationResult.POSTPONE) {
                    toRemoveSet.add(b);
                    BlockManager.this.postponedMisreplicatedBlocksCount.decrementAndGet();
                }
                return null;
            }
        };
        HashSet toRemove = new HashSet();
        for (Block postponedMisreplicatedBlock : this.postponedMisreplicatedBlocks) {
            rescanPostponedMisreplicatedBlocksHandler.setParams(new Object[]{postponedMisreplicatedBlock, toRemove});
            rescanPostponedMisreplicatedBlocksHandler.handle(this.namesystem);
        }
        this.postponedMisreplicatedBlocks.removeAll(toRemove);
    }

    @VisibleForTesting
    public ReportStatistics processReport(final DatanodeStorageInfo storage, BlockReport report) throws IOException {
        boolean firstBlockReport;
        ConcurrentHashMap mapToAdd = new ConcurrentHashMap();
        ConcurrentHashMap mapToRemove = new ConcurrentHashMap();
        ConcurrentHashMap mapToInvalidate = new ConcurrentHashMap();
        ConcurrentHashMap mapToCorrupt = new ConcurrentHashMap();
        ConcurrentHashMap mapToUC = new ConcurrentHashMap();
        Set<BlockInfo> toAdd = Collections.newSetFromMap(mapToAdd);
        Set<Long> toRemove = Collections.newSetFromMap(mapToRemove);
        Set<Block> toInvalidate = Collections.newSetFromMap(mapToInvalidate);
        Set<BlockToMarkCorrupt> toCorrupt = Collections.newSetFromMap(mapToCorrupt);
        Set<StatefulBlockInfo> toUC = Collections.newSetFromMap(mapToUC);
        boolean bl = firstBlockReport = this.namesystem.isInStartupSafeMode() || storage.getBlockReportCount() == 0;
        if (storage.getBlockReportCount() == 0) {
            HashBuckets.getInstance().createBucketsForStorage(storage);
        }
        ReportStatistics reportStatistics = this.reportDiff(storage, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC, firstBlockReport);
        for (StatefulBlockInfo b : toUC) {
            if (firstBlockReport) {
                this.addStoredBlockUnderConstructionImmediateTx(b.storedBlock, storage, b.reportedState);
                continue;
            }
            this.addStoredBlockUnderConstructionTx(b, storage);
        }
        ArrayList<4> addTasks = new ArrayList<4>();
        int numBlocksLogged = 0;
        for (final BlockInfo blockInfo : toAdd) {
            if (firstBlockReport) {
                this.addStoredBlockImmediateTx(blockInfo, storage, (long)numBlocksLogged < this.maxNumBlocksToLog);
            } else {
                final boolean bl2 = (long)numBlocksLogged < this.maxNumBlocksToLog;
                addTasks.add(new Callable<Object>(){

                    @Override
                    public Object call() throws Exception {
                        BlockManager.this.addStoredBlockTx(blockInfo, storage, null, bl2);
                        return null;
                    }
                });
            }
            ++numBlocksLogged;
        }
        if ((long)numBlocksLogged > this.maxNumBlocksToLog) {
            blockLog.info((Object)("BLOCK* processReport: logged info for " + this.maxNumBlocksToLog + " of " + numBlocksLogged + " reported."));
        }
        try {
            List futures = ((FSNamesystem)this.namesystem).getSubtreeOperationsExecutor().invokeAll(addTasks);
            for (Future future : futures) {
                future.get();
            }
        }
        catch (InterruptedException e) {
            LOG.error((Object)e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            throw (IOException)e.getCause();
        }
        for (BlockToMarkCorrupt blockToMarkCorrupt : toCorrupt) {
            this.markBlockAsCorruptTx(blockToMarkCorrupt, storage);
        }
        for (Block block : toInvalidate) {
            blockLog.info((Object)("BLOCK* processReport: " + block + " on " + storage + " " + storage + " size " + block.getNumBytes() + " does not belong to any file"));
        }
        this.addToInvalidates(toInvalidate, storage);
        this.removeBlocks(toRemove, storage.getDatanodeDescriptor());
        return reportStatistics;
    }

    @VisibleForTesting
    public void removeBlocks(Collection<Long> allBlockIds, final DatanodeDescriptor node) throws IOException {
        long[] array = new long[allBlockIds.size()];
        int i = 0;
        Iterator<Long> iterator = allBlockIds.iterator();
        while (iterator.hasNext()) {
            long blockId;
            array[i] = blockId = iterator.next().longValue();
            ++i;
        }
        final Map<Integer, List<Long>> inodeIdsToBlockMap = INodeUtil.getINodeIdsForBlockIds(array);
        final ArrayList<Integer> inodeIds = new ArrayList<Integer>(inodeIdsToBlockMap.keySet());
        try {
            Slicer.slice((int)inodeIds.size(), (int)this.removalBatchSize, (int)this.removalNoThreads, (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    List ids = inodeIds.subList(startIndex, endIndex);
                    BlockManager.this.removeStoredBlocksTx((List<Integer>)ids, (Map<Integer, List<Long>>)inodeIdsToBlockMap, node);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    private void removeBlocks(List<Long> allBlockIds, final int sid) throws IOException {
        long[] array = new long[allBlockIds.size()];
        int i = 0;
        Iterator<Long> iterator = allBlockIds.iterator();
        while (iterator.hasNext()) {
            long blockId;
            array[i] = blockId = iterator.next().longValue();
            ++i;
        }
        final Map<Integer, List<Long>> inodeIdsToBlockMap = INodeUtil.getINodeIdsForBlockIds(array);
        final ArrayList<Integer> inodeIds = new ArrayList<Integer>(inodeIdsToBlockMap.keySet());
        try {
            Slicer.slice((int)inodeIds.size(), (int)this.removalBatchSize, (int)this.removalNoThreads, (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    List ids = inodeIds.subList(startIndex, endIndex);
                    BlockManager.this.removeStoredBlocksTx((List<Integer>)ids, (Map<Integer, List<Long>>)inodeIdsToBlockMap, sid);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    private ReportStatistics reportDiff(DatanodeStorageInfo storage, BlockReport newReport, Collection<BlockInfo> toAdd, Collection<Long> toRemove, Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, Collection<StatefulBlockInfo> toUC, boolean firstBlockReport) throws IOException {
        if (newReport == null) {
            return null;
        }
        Map<Long, Long> invalidatedReplicas = storage.getAllStorageInvalidatedReplicasWithGenStamp();
        ReportStatistics stats = new ReportStatistics();
        stats.numBuckets = newReport.getBuckets().length;
        stats.numBlocks = newReport.getNumberOfBlocks();
        HashMatchingResult matchingResult = this.calculateMismatchedHashes(storage, newReport, firstBlockReport);
        stats.numBucketsMatching = matchingResult.matchingBuckets.size();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)String.format("%d/%d reported hashes matched", newReport.getHashes().length - matchingResult.mismatchedBuckets.size(), newReport.getHashes().length));
        }
        HashSet<Long> aggregatedSafeBlocks = new HashSet<Long>();
        Map<Long, Integer> mismatchedBlocksAndInodes = storage.getAllStorageReplicasInBuckets(matchingResult.mismatchedBuckets);
        Set<Long> allMismatchedBlocksOnServer = mismatchedBlocksAndInodes.keySet();
        aggregatedSafeBlocks.addAll(allMismatchedBlocksOnServer);
        this.processMisMatchingBuckets(storage, newReport, matchingResult, toAdd, toInvalidate, toCorrupt, toUC, firstBlockReport, mismatchedBlocksAndInodes, aggregatedSafeBlocks, allMismatchedBlocksOnServer, invalidatedReplicas);
        stats.numToAdd = toAdd.size();
        stats.numToInvalidate = toInvalidate.size();
        stats.numToCorrupt = toCorrupt.size();
        stats.numToUC = toUC.size();
        toRemove.addAll(allMismatchedBlocksOnServer);
        stats.numToRemove = toRemove.size();
        if (this.namesystem.isInStartupSafeMode()) {
            aggregatedSafeBlocks.removeAll(toRemove);
            LOG.debug((Object)("AGGREGATED SAFE BLOCK #: " + aggregatedSafeBlocks.size() + " REPORTED BLOCK #: " + newReport.getNumberOfBlocks()));
            this.namesystem.adjustSafeModeBlocks(aggregatedSafeBlocks);
            stats.numConsideredSafeIfInSafemode = aggregatedSafeBlocks.size();
        }
        return stats;
    }

    private void processMisMatchingBuckets(final DatanodeStorageInfo storage, BlockReport newReport, HashMatchingResult matchingResult, final Collection<BlockInfo> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC, final boolean firstBlockReport, final Map<Long, Integer> mismatchedBlocksAndInodes, final Set<Long> aggregatedSafeBlocks, final Set<Long> allMismatchedBlocksOnServer, final Map<Long, Long> invalidatedReplicas) throws IOException {
        ArrayList<7> subTasks = new ArrayList<7>();
        Iterator iterator = matchingResult.mismatchedBuckets.iterator();
        while (iterator.hasNext()) {
            final int bucketId = (Integer)iterator.next();
            Bucket bucket = newReport.getBuckets()[bucketId];
            final List<ReportedBlock> bucketBlocks = Arrays.asList(bucket.getBlocks());
            Callable<Void> subTask = new Callable<Void>(){

                @Override
                public Void call() throws IOException {
                    HopsTransactionalRequestHandler processReportHandler = BlockManager.this.processBucketInternal(storage, bucketId, toAdd, toInvalidate, toCorrupt, toUC, firstBlockReport, mismatchedBlocksAndInodes, aggregatedSafeBlocks, allMismatchedBlocksOnServer, invalidatedReplicas, bucketBlocks);
                    processReportHandler.handle();
                    return null;
                }
            };
            subTasks.add(subTask);
        }
        try {
            List futures = ((FSNamesystem)this.namesystem).getSubtreeOperationsExecutor().invokeAll(subTasks);
            for (Future maybeException : futures) {
                maybeException.get();
            }
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Exception was thrown during block report processing", (Throwable)e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            throw (IOException)e.getCause();
        }
    }

    private HopsTransactionalRequestHandler processBucketInternal(final DatanodeStorageInfo storage, final int bucketId, final Collection<BlockInfo> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC, final boolean firstBlockReport, final Map<Long, Integer> mismatchedBlocksAndInodes, final Set<Long> aggregatedSafeBlocks, final Set<Long> allMismatchedBlocksOnServer, final Map<Long, Long> invalidatedReplicas, final List<ReportedBlock> reportedBlocks) {
        return new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_REPORT){

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                if (!reportedBlocks.isEmpty()) {
                    ArrayList<Long> resolvedBlockIds = new ArrayList<Long>();
                    ArrayList<Integer> inodeIds = new ArrayList<Integer>();
                    ArrayList<Long> unResolvedBlockIds = new ArrayList<Long>();
                    for (ReportedBlock reportedBlock : reportedBlocks) {
                        Integer inodeId = (Integer)mismatchedBlocksAndInodes.get(reportedBlock.getBlockId());
                        if (inodeId != null) {
                            resolvedBlockIds.add(reportedBlock.getBlockId());
                            inodeIds.add(inodeId);
                            continue;
                        }
                        unResolvedBlockIds.add(reportedBlock.getBlockId());
                    }
                    locks.add(lf.getBlockReportingLocks(Longs.toArray(resolvedBlockIds), Ints.toArray(inodeIds), Longs.toArray(unResolvedBlockIds), storage.getSid()));
                }
                locks.add(lf.getIndividualHashBucketLock(storage.getSid(), bucketId));
            }

            public Object performTask() throws IOException {
                long hash = 0L;
                for (ReportedBlock brb : reportedBlocks) {
                    Block block = new Block();
                    block.setNoPersistance(brb.getBlockId(), brb.getLength(), brb.getGenerationStamp());
                    BlockInfo storedBlock = BlockManager.this.processReportedBlock(storage, block, BlockManager.this.fromBlockReportBlockState(brb.getState()), toAdd, toInvalidate, toCorrupt, toUC, aggregatedSafeBlocks, firstBlockReport, allMismatchedBlocksOnServer.contains(brb.getBlockId()), invalidatedReplicas);
                    if (storedBlock == null) continue;
                    mismatchedBlocksAndInodes.remove(storedBlock.getBlockId());
                    if (brb.getState() != BlockReportBlockState.FINALIZED) continue;
                    hash += BlockReport.hashAsFinalized(brb);
                }
                HashBucket bucket = HashBuckets.getInstance().getBucket(storage.getSid(), bucketId);
                bucket.setHash(hash);
                return null;
            }
        };
    }

    private HdfsServerConstants.ReplicaState fromBlockReportBlockState(BlockReportBlockState state) {
        switch (state) {
            case FINALIZED: {
                return HdfsServerConstants.ReplicaState.FINALIZED;
            }
            case RBW: {
                return HdfsServerConstants.ReplicaState.RBW;
            }
            case RWR: {
                return HdfsServerConstants.ReplicaState.RWR;
            }
        }
        throw new RuntimeException("Block Report should only contain FINALIZED, RBW and RWR replicas. Got: " + (Object)((Object)state));
    }

    private HashMatchingResult calculateMismatchedHashes(DatanodeStorageInfo storage, BlockReport report, Boolean firstBlockReport) throws IOException {
        List<HashBucket> storedHashes = HashBuckets.getInstance().getBucketsForStorage(storage);
        HashMap<Integer, HashBucket> storedHashesMap = new HashMap<Integer, HashBucket>();
        for (HashBucket allStorageHash : storedHashes) {
            storedHashesMap.put(allStorageHash.getBucketId(), allStorageHash);
        }
        ArrayList<Integer> matchedBuckets = new ArrayList<Integer>();
        ArrayList<Integer> mismatchedBuckets = new ArrayList<Integer>();
        for (int i = 0; i < report.getBuckets().length; ++i) {
            if (!storedHashesMap.containsKey(i)) {
                mismatchedBuckets.add(i);
                continue;
            }
            long storedHash = ((HashBucket)storedHashesMap.get(i)).getHash();
            if (firstBlockReport.booleanValue()) {
                if (report.getBuckets()[i].getBlocks().length == 0 && storedHash == 0L) {
                    matchedBuckets.add(i);
                    continue;
                }
                mismatchedBuckets.add(i);
                continue;
            }
            long reportedHash = report.getHashes()[i];
            if (storedHash == reportedHash) {
                matchedBuckets.add(i);
                continue;
            }
            mismatchedBuckets.add(i);
        }
        assert (matchedBuckets.size() + mismatchedBuckets.size() == report.getHashes().length);
        return new HashMatchingResult(matchedBuckets, mismatchedBuckets);
    }

    private BlockInfo processIncrementallyReportedBlock(DatanodeStorageInfo storage, Block block, HdfsServerConstants.ReplicaState reportedState, Collection<BlockInfo> toAdd, Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, Collection<StatefulBlockInfo> toUC) throws IOException {
        BlockInfo storedBlock;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Reported block " + block + " on " + storage.getStorageID() + " size " + block.getNumBytes() + " replicaState = " + (Object)((Object)reportedState)));
        }
        if ((storedBlock = this.blocksMap.getStoredBlock(block)) == null) {
            blockLog.info((Object)("BLOCK* processReport: " + block + " on " + storage + " size " + block.getNumBytes() + " does not belong to any file"));
            toInvalidate.add(new Block(block));
            return null;
        }
        HdfsServerConstants.BlockUCState ucState = storedBlock.getBlockUCState();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("In memory blockUCState = " + (Object)((Object)ucState) + " bid=" + storedBlock.getBlockIndex()));
        }
        if (this.invalidateBlocks.contains(storage, this.getBlockInfo(block))) {
            return storedBlock;
        }
        BlockToMarkCorrupt c = this.checkReplicaCorrupt(block, reportedState, storedBlock, ucState, storage);
        if (c != null) {
            toCorrupt.add(c);
            return storedBlock;
        }
        if (this.isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
            toUC.add(new StatefulBlockInfo((BlockInfoUnderConstruction)storedBlock, block, reportedState));
            return storedBlock;
        }
        if (reportedState == HdfsServerConstants.ReplicaState.FINALIZED && (!storedBlock.isReplicatedOnStorage(storage) || this.corruptReplicas.isReplicaCorrupt(storedBlock, storage.getDatanodeDescriptor()))) {
            toAdd.add(storedBlock);
        }
        return storedBlock;
    }

    private BlockInfo processReportedBlock(DatanodeStorageInfo storage, Block block, HdfsServerConstants.ReplicaState reportedState, Collection<BlockInfo> toAdd, Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt, Collection<StatefulBlockInfo> toUC, Set<Long> safeBlocks, boolean firstBlockReport, boolean replicaAlreadyExists, Map<Long, Long> allMachineInvalidatedBlocks) throws IOException {
        BlockInfo storedBlock;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Reported block " + block + " on " + storage.getStorageID() + " size " + block.getNumBytes() + " replicaState = " + (Object)((Object)reportedState)));
        }
        if ((storedBlock = this.blocksMap.getStoredBlock(block)) == null) {
            blockLog.info((Object)("BLOCK* processReport: " + block + " on " + storage.getStorageID() + " size " + block.getNumBytes() + " does not belong to any file"));
            toInvalidate.add(new Block(block));
            safeBlocks.remove(block.getBlockId());
            return null;
        }
        HdfsServerConstants.BlockUCState ucState = storedBlock.getBlockUCState();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("In memory blockUCState = " + (Object)((Object)ucState) + " bid=" + storedBlock.getBlockIndex()));
        }
        if (!firstBlockReport && allMachineInvalidatedBlocks.containsKey(block.getBlockId()) && allMachineInvalidatedBlocks.get(block.getBlockId()).longValue() == block.getGenerationStamp()) {
            return storedBlock;
        }
        BlockToMarkCorrupt c = this.checkReplicaCorrupt(block, reportedState, storedBlock, ucState, storage);
        if (c != null) {
            toCorrupt.add(c);
            safeBlocks.remove(block.getBlockId());
            return storedBlock;
        }
        if (this.isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
            toUC.add(new StatefulBlockInfo((BlockInfoUnderConstruction)storedBlock, block, reportedState));
            safeBlocks.remove(block.getBlockId());
            return storedBlock;
        }
        if (reportedState == HdfsServerConstants.ReplicaState.FINALIZED) {
            if (replicaAlreadyExists || storedBlock.isReplicatedOnStorage(storage)) {
                return storedBlock;
            }
            toAdd.add(storedBlock);
            safeBlocks.remove(block.getBlockId());
        }
        return storedBlock;
    }

    private BlockToMarkCorrupt checkReplicaCorrupt(Block reported, HdfsServerConstants.ReplicaState reportedState, BlockInfo storedBlock, HdfsServerConstants.BlockUCState ucState, DatanodeStorageInfo storage) {
        switch (reportedState) {
            case FINALIZED: {
                switch (ucState) {
                    case COMPLETE: 
                    case COMMITTED: {
                        if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
                            long reportedGS = reported.getGenerationStamp();
                            return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + (Object)((Object)ucState) + " and reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
                        }
                        if (storedBlock.getNumBytes() != reported.getNumBytes()) {
                            return new BlockToMarkCorrupt(storedBlock, "block is " + (Object)((Object)ucState) + " and reported length " + reported.getNumBytes() + " does not match length in block map " + storedBlock.getNumBytes(), CorruptReplicasMap.Reason.SIZE_MISMATCH);
                        }
                        return null;
                    }
                }
                return null;
            }
            case RBW: 
            case RWR: {
                if (!storedBlock.isComplete()) {
                    return null;
                }
                if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
                    long reportedGS = reported.getGenerationStamp();
                    return new BlockToMarkCorrupt(storedBlock, reportedGS, "reported " + (Object)((Object)reportedState) + " replica with genstamp " + reportedGS + " does not match COMPLETE block's genstamp in block map " + storedBlock.getGenerationStamp(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
                }
                if (reportedState == HdfsServerConstants.ReplicaState.RBW) {
                    LOG.info((Object)("Received an RBW replica for " + storedBlock + " on " + storage + ": ignoring it, since it is complete with the same genstamp"));
                    return null;
                }
                return new BlockToMarkCorrupt(storedBlock, "reported replica has invalid state " + (Object)((Object)reportedState), CorruptReplicasMap.Reason.INVALID_STATE);
            }
        }
        String msg = "Unexpected replica state " + (Object)((Object)reportedState) + " for block: " + storedBlock + " on " + storage + " size " + storedBlock.getNumBytes();
        LOG.warn((Object)msg);
        return new BlockToMarkCorrupt(storedBlock, msg, CorruptReplicasMap.Reason.INVALID_STATE);
    }

    private boolean isBlockUnderConstruction(BlockInfo storedBlock, HdfsServerConstants.BlockUCState ucState, HdfsServerConstants.ReplicaState reportedState) {
        switch (reportedState) {
            case FINALIZED: {
                switch (ucState) {
                    case UNDER_CONSTRUCTION: 
                    case UNDER_RECOVERY: {
                        return true;
                    }
                }
                return false;
            }
            case RBW: 
            case RWR: {
                return !storedBlock.isComplete();
            }
        }
        return false;
    }

    private void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, DatanodeStorageInfo storage) throws IOException {
        BlockInfoUnderConstruction block = ucBlock.storedBlock;
        block.addReplicaIfNotPresent(storage, ucBlock.reportedState, ucBlock.reportedBlock.getGenerationStamp());
        if (ucBlock.reportedState == HdfsServerConstants.ReplicaState.FINALIZED && !block.isReplicatedOnStorage(storage)) {
            this.addStoredBlock(block, storage, null, true);
        }
    }

    private void addStoredBlockImmediate(BlockInfo storedBlock, DatanodeStorageInfo storage, boolean logEveryBlock) throws IOException {
        assert (storedBlock != null);
        if (!this.namesystem.isInStartupSafeMode() || this.namesystem.isPopulatingReplQueues()) {
            this.addStoredBlock(storedBlock, storage, null, logEveryBlock);
            return;
        }
        storage.addBlock(storedBlock);
        int numCurrentReplica = this.countLiveNodes(storedBlock);
        if (storedBlock.getBlockUCState() == HdfsServerConstants.BlockUCState.COMMITTED && numCurrentReplica >= this.minReplication) {
            this.completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
        } else if (storedBlock.isComplete()) {
            this.namesystem.incrementSafeBlockCount(storedBlock);
        }
    }

    private Block addStoredBlock(BlockInfo block, DatanodeStorageInfo storage, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException {
        INode iNode;
        int curReplicaDelta;
        boolean added;
        assert (block != null);
        BlockInfo storedBlock = block instanceof BlockInfoUnderConstruction ? this.blocksMap.getStoredBlock(block) : block;
        if (storedBlock == null || storedBlock.getBlockCollection() == null) {
            blockLog.info((Object)("BLOCK* addStoredBlock: " + block + " on " + storage.getStorageID() + " size " + block.getNumBytes() + " but it does not belong to any file"));
            return block;
        }
        assert (storedBlock != null) : "Block must be stored by now";
        BlockCollection bc = storedBlock.getBlockCollection();
        assert (bc != null) : "Block must belong to a file";
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        NumberReplicas numBeforeAdding = null;
        if (fsNamesystem.isErasureCodingEnabled()) {
            numBeforeAdding = this.countNodes(block);
        }
        if (added = storage.addBlock(storedBlock)) {
            curReplicaDelta = 1;
            if (logEveryBlock) {
                this.logAddStoredBlock(storedBlock, storage);
            }
        } else {
            this.corruptReplicas.removeFromCorruptReplicasMap(block, storage.getDatanodeDescriptor(), CorruptReplicasMap.Reason.GENSTAMP_MISMATCH);
            curReplicaDelta = 0;
            blockLog.warn((Object)("BLOCK* addStoredBlock: Redundant addStoredBlock request received for " + storedBlock + " on " + storage.getStorageID() + " size " + storedBlock.getNumBytes()));
        }
        NumberReplicas num = this.countNodes(storedBlock);
        int numLiveReplicas = num.liveReplicas();
        int numCurrentReplica = numLiveReplicas + this.pendingReplications.getNumReplicas(storedBlock);
        if (storedBlock.getBlockUCState() == HdfsServerConstants.BlockUCState.COMMITTED && numLiveReplicas >= this.minReplication) {
            storedBlock = this.completeBlock(bc, storedBlock, false);
        } else if (storedBlock.isComplete() && added) {
            this.namesystem.incrementSafeBlockCount(storedBlock);
        }
        if (bc.isUnderConstruction()) {
            return storedBlock;
        }
        if (!this.namesystem.isPopulatingReplQueues()) {
            return storedBlock;
        }
        short fileReplication = bc.getBlockReplication();
        if (!this.isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
            this.neededReplications.remove(storedBlock, numCurrentReplica, num.decommissionedReplicas(), fileReplication);
        } else {
            this.updateNeededReplications(storedBlock, curReplicaDelta, 0);
        }
        if (numCurrentReplica > fileReplication) {
            this.processOverReplicatedBlock(storedBlock, fileReplication, storage.getDatanodeDescriptor(), delNodeHint);
        }
        int corruptReplicasCount = this.corruptReplicas.numCorruptReplicas(storedBlock);
        int numCorruptNodes = num.corruptReplicas();
        if (numCorruptNodes != corruptReplicasCount) {
            LOG.warn((Object)("Inconsistent number of corrupt replicas for " + storedBlock + "blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount));
        }
        if (corruptReplicasCount > 0 && numLiveReplicas >= fileReplication) {
            this.invalidateCorruptReplicas(storedBlock);
        }
        if (fsNamesystem.isErasureCodingEnabled() && !(iNode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{bc.getId()})).isUnderConstruction() && numBeforeAdding.liveReplicas() == 0 && numLiveReplicas > 0) {
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{bc.getId()});
            if (status != null && status.isCorrupt()) {
                int lostBlockCount = status.getLostBlocks() - 1;
                status.setLostBlocks(Integer.valueOf(lostBlockCount));
                if (lostBlockCount == 0) {
                    status.setStatus(EncodingStatus.Status.ENCODED);
                    status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                }
                EntityManager.update((Object)status);
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{bc.getId()});
                if (status == null) {
                    LOG.info((Object)("addStoredBlock returned null for " + bc.getId()));
                } else {
                    LOG.info((Object)("addStoredBlock found " + bc.getId() + " with status " + status));
                }
                if (status != null && status.isParityCorrupt()) {
                    int lostParityBlockCount = status.getLostParityBlocks() - 1;
                    status.setLostParityBlocks(Integer.valueOf(lostParityBlockCount));
                    if (lostParityBlockCount == 0) {
                        status.setParityStatus(EncodingStatus.ParityStatus.HEALTHY);
                        status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    EntityManager.update((Object)status);
                    LOG.info((Object)"addStoredBlock found set status to potentially fixed");
                }
            }
        }
        return storedBlock;
    }

    private void logAddStoredBlock(BlockInfo storedBlock, DatanodeStorageInfo storage) {
        if (!blockLog.isInfoEnabled()) {
            return;
        }
        StringBuilder sb = new StringBuilder(500);
        sb.append("BLOCK* addStoredBlock: blockMap updated: ").append(storage).append(" is added to ").append(storedBlock).append(" size ").append(storedBlock.getNumBytes()).append(" byte");
        blockLog.info((Object)sb);
    }

    private void invalidateCorruptReplicas(BlockInfo blk) throws StorageException, TransactionContextException {
        DatanodeDescriptor[] nodesCopy;
        Collection<DatanodeDescriptor> nodes = this.corruptReplicas.getNodes(blk);
        boolean removedFromBlocksMap = true;
        if (nodes == null) {
            return;
        }
        for (DatanodeDescriptor node : nodesCopy = nodes.toArray(new DatanodeDescriptor[0])) {
            try {
                removedFromBlocksMap = this.invalidateBlock(new BlockToMarkCorrupt(blk, null, CorruptReplicasMap.Reason.ANY), node);
            }
            catch (IOException e) {
                blockLog.info((Object)("invalidateCorruptReplicas error in deleting bad block " + blk + " on " + node), (Throwable)e);
                removedFromBlocksMap = false;
            }
        }
        if (removedFromBlocksMap) {
            this.corruptReplicas.removeFromCorruptReplicasMap(blk);
        }
    }

    public void processMisReplicatedBlocks() throws IOException {
        this.stopReplicationInitializer();
        if (this.namesystem.isLeader()) {
            HdfsVariables.resetMisReplicatedIndex();
            this.neededReplications.clear();
        }
        this.replicationQueuesInitializer = new Daemon(){

            public void run() {
                try {
                    BlockManager.this.processMisReplicatesAsync();
                }
                catch (InterruptedException ie) {
                    LOG.info((Object)"Interrupted while processing replication queues.");
                }
                catch (Exception e) {
                    LOG.error((Object)"Error while processing replication queues async", (Throwable)e);
                }
            }
        };
        this.replicationQueuesInitializer.setName("Replication Queue Initializer");
        this.replicationQueuesInitializer.start();
    }

    private void stopReplicationInitializer() {
        if (this.replicationQueuesInitializer != null) {
            this.replicationQueuesInitializer.interrupt();
            try {
                this.replicationQueuesInitializer.join();
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting for replicationQueueInitializer. Returning..");
                return;
            }
            finally {
                this.replicationQueuesInitializer = null;
            }
        }
    }

    private List<MisReplicatedRange> checkMisReplicatedRangeQueue() throws IOException {
        LightWeightRequestHandler cleanMisReplicatedRangeQueueHandler = new LightWeightRequestHandler(HDFSOperationType.UPDATE_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                HdfsStorageFactory.getConnector().writeLock();
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                List ranges = da.getAll();
                HashSet<Long> activeNodes = new HashSet<Long>();
                for (ActiveNode nn : BlockManager.this.namesystem.getNameNode().getLeaderElectionInstance().getActiveNamenodes().getActiveNodes()) {
                    activeNodes.add(nn.getId());
                }
                ArrayList<MisReplicatedRange> toRemove = new ArrayList<MisReplicatedRange>();
                for (MisReplicatedRange range : ranges) {
                    if (activeNodes.contains(range.getNnId())) continue;
                    toRemove.add(range);
                }
                da.remove(toRemove);
                return toRemove;
            }
        };
        ArrayList<MisReplicatedRange> toProcess = new ArrayList<MisReplicatedRange>();
        while (this.namesystem.isLeader() && this.sizeOfMisReplicatedRangeQueue() > 0) {
            toProcess.addAll((List)cleanMisReplicatedRangeQueueHandler.handle());
        }
        return toProcess;
    }

    private void processMisReplicatesAsync() throws InterruptedException, IOException {
        final AtomicLong nrInvalid = new AtomicLong(0L);
        final AtomicLong nrOverReplicated = new AtomicLong(0L);
        final AtomicLong nrUnderReplicated = new AtomicLong(0L);
        final AtomicLong nrPostponed = new AtomicLong(0L);
        final AtomicLong nrUnderConstruction = new AtomicLong(0L);
        long startTimeMisReplicatedScan = Time.now();
        long totalBlocks = this.blocksMap.size();
        this.replicationQueuesInitProgress = 0.0;
        final AtomicLong totalProcessed = new AtomicLong(0L);
        int filesToProcess = this.processMisReplicatedBatchSize * this.processMisReplicatedNoOfBatchs;
        HopsTransactionalRequestHandler processMisReplicatedBlocksHandler = new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_MIS_REPLICATED_BLOCKS_PER_INODE_BATCH){

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                List inodeIdentifiers = (List)this.getParams()[0];
                locks.add(lf.getBatchedINodesLock(inodeIdentifiers)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
            }

            public Object performTask() throws IOException {
                List inodeIdentifiers = (List)this.getParams()[0];
                for (INodeIdentifier inodeIdentifier : inodeIdentifiers) {
                    INode inode = (INode)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{inodeIdentifier.getInodeId()});
                    for (BlockInfo block : ((INodeFile)inode).getBlocks()) {
                        MisReplicationResult res = BlockManager.this.processMisReplicatedBlock(block);
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("block " + block + ": " + (Object)((Object)res)));
                        }
                        switch (res) {
                            case UNDER_REPLICATED: {
                                nrUnderReplicated.incrementAndGet();
                                break;
                            }
                            case OVER_REPLICATED: {
                                nrOverReplicated.incrementAndGet();
                                break;
                            }
                            case INVALID: {
                                nrInvalid.incrementAndGet();
                                break;
                            }
                            case POSTPONE: {
                                nrPostponed.incrementAndGet();
                                BlockManager.this.postponeBlock(block);
                                break;
                            }
                            case UNDER_CONSTRUCTION: {
                                nrUnderConstruction.incrementAndGet();
                                break;
                            }
                            case OK: {
                                break;
                            }
                            default: {
                                throw new AssertionError((Object)("Invalid enum value: " + (Object)((Object)res)));
                            }
                        }
                        totalProcessed.incrementAndGet();
                    }
                }
                return null;
            }
        };
        this.addToMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), -1L));
        while (this.namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
            boolean haveMore;
            long filesToProcessEndIndex;
            long filesToProcessStartIndex;
            do {
                filesToProcessEndIndex = HdfsVariables.incrementMisReplicatedIndex(filesToProcess);
                filesToProcessStartIndex = filesToProcessEndIndex - (long)filesToProcess;
                haveMore = this.blocksMap.haveFilesWithIdGreaterThan(filesToProcessEndIndex);
            } while (!this.blocksMap.haveFilesWithIdBetween(filesToProcessStartIndex, filesToProcessEndIndex) && haveMore);
            this.addToMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), filesToProcessStartIndex));
            this.processMissreplicatedInt(filesToProcessStartIndex, filesToProcessEndIndex, filesToProcess, processMisReplicatedBlocksHandler);
            this.addToMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), -1L));
            this.replicationQueuesInitProgress = Math.min((double)totalProcessed.get() / (double)totalBlocks, 1.0);
            if (haveMore) continue;
            this.removeFromMisReplicatedRangeQueue(new MisReplicatedRange(this.namesystem.getNamenodeId(), -1L));
            if (this.namesystem.isLeader()) {
                List<MisReplicatedRange> toProcess = this.checkMisReplicatedRangeQueue();
                for (MisReplicatedRange range : toProcess) {
                    long startIndex = range.getStartIndex();
                    if (startIndex <= 0L) continue;
                    this.processMissreplicatedInt(startIndex, startIndex + (long)filesToProcess, filesToProcess, processMisReplicatedBlocksHandler);
                }
            }
            LOG.info((Object)("Total number of blocks            = " + this.blocksMap.size()));
            LOG.info((Object)("Number of invalid blocks          = " + nrInvalid.get()));
            LOG.info((Object)("Number of under-replicated blocks = " + nrUnderReplicated.get()));
            LOG.info((Object)("Number of  over-replicated blocks = " + nrOverReplicated.get() + (nrPostponed.get() > 0L ? " (" + nrPostponed.get() + " postponed)" : "")));
            LOG.info((Object)("Number of blocks being written    = " + nrUnderConstruction.get()));
            NameNode.stateChangeLog.info((Object)("STATE* Replication Queue initialization scan for invalid, over- and under-replicated blocks completed in " + (Time.now() - startTimeMisReplicatedScan) + " msec"));
            break;
        }
        if (Thread.currentThread().isInterrupted()) {
            LOG.info((Object)"Interrupted while processing replication queues.");
        }
    }

    private void processMissreplicatedInt(long filesToProcessStartIndex, long filesToProcessEndIndex, int filesToProcess, final HopsTransactionalRequestHandler processMisReplicatedBlocksHandler) throws IOException {
        final List<INodeIdentifier> allINodes = this.blocksMap.getAllINodeFiles(filesToProcessStartIndex, filesToProcessEndIndex);
        LOG.info((Object)("processMisReplicated read  " + allINodes.size() + "/" + filesToProcess + " in the Ids range [" + filesToProcessStartIndex + " - " + filesToProcessEndIndex + "]"));
        try {
            Slicer.slice((int)allINodes.size(), (int)this.processMisReplicatedBatchSize, (int)this.processMisReplicatedNoThreads, (Slicer.OperationHandler)new Slicer.OperationHandler(){

                public void handle(int startIndex, int endIndex) throws Exception {
                    List inodes = allINodes.subList(startIndex, endIndex);
                    processMisReplicatedBlocksHandler.setParams(new Object[]{inodes});
                    processMisReplicatedBlocksHandler.handle(BlockManager.this.namesystem);
                }
            });
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    public double getReplicationQueuesInitProgress() {
        return this.replicationQueuesInitProgress;
    }

    private void addToMisReplicatedRangeQueue(final MisReplicatedRange range) throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.UPDATE_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                HdfsStorageFactory.getConnector().writeLock();
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                da.insert(range);
                return null;
            }
        }.handle();
    }

    private void removeFromMisReplicatedRangeQueue(final MisReplicatedRange range) throws IOException {
        new LightWeightRequestHandler(HDFSOperationType.UPDATE_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                HdfsStorageFactory.getConnector().writeLock();
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                da.remove(range);
                return null;
            }
        }.handle();
    }

    private int sizeOfMisReplicatedRangeQueue() throws IOException {
        return (Integer)new LightWeightRequestHandler(HDFSOperationType.COUNT_ALL_MIS_REPLICATED_RANGE_QUEUE){

            public Object performTask() throws IOException {
                MisReplicatedRangeQueueDataAccess da = (MisReplicatedRangeQueueDataAccess)HdfsStorageFactory.getDataAccess(MisReplicatedRangeQueueDataAccess.class);
                return da.countAll();
            }
        }.handle();
    }

    private MisReplicationResult processMisReplicatedBlock(BlockInfo block) throws IOException {
        NumberReplicas num;
        int numCurrentReplica;
        BlockCollection bc = block.getBlockCollection();
        if (bc == null) {
            this.addToInvalidates(block);
            return MisReplicationResult.INVALID;
        }
        if (!block.isComplete()) {
            return MisReplicationResult.UNDER_CONSTRUCTION;
        }
        short expectedReplication = bc.getBlockReplication();
        if (this.isNeededReplication(block, expectedReplication, numCurrentReplica = (num = this.countNodes(block)).liveReplicas()) && this.neededReplications.add(block, numCurrentReplica, num.decommissionedReplicas(), expectedReplication)) {
            return MisReplicationResult.UNDER_REPLICATED;
        }
        if (numCurrentReplica > expectedReplication) {
            if (num.replicasOnStaleNodes() > 0) {
                return MisReplicationResult.POSTPONE;
            }
            this.processOverReplicatedBlock(block, expectedReplication, null, null);
            return MisReplicationResult.OVER_REPLICATED;
        }
        return MisReplicationResult.OK;
    }

    public void setReplication(short oldRepl, short newRepl, String src, Block ... blocks) throws IOException {
        if (newRepl == oldRepl) {
            return;
        }
        for (Block b : blocks) {
            this.updateNeededReplications(b, 0, newRepl - oldRepl);
        }
        if (oldRepl > newRepl) {
            LOG.info((Object)("Decreasing replication from " + oldRepl + " to " + newRepl + " for " + src));
            for (Block b : blocks) {
                this.processOverReplicatedBlock(b, newRepl, null, null);
            }
        } else {
            LOG.info((Object)("Increasing replication from " + oldRepl + " to " + newRepl + " for " + src));
        }
    }

    private void processOverReplicatedBlock(Block block, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) throws IOException {
        if (addedNode == delNodeHint) {
            delNodeHint = null;
        }
        ArrayList<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
        Collection<DatanodeDescriptor> corruptNodes = this.corruptReplicas.getNodes(this.getBlockInfo(block));
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(block, DatanodeStorage.State.NORMAL)) {
            DatanodeDescriptor cur = storage.getDatanodeDescriptor();
            if (storage.areBlockContentsStale()) {
                LOG.info((Object)("BLOCK* processOverReplicatedBlock: Postponing processing of over-replicated " + block + " since storage " + storage + " does not yet have up-to-date block information."));
                this.postponeBlock(block);
                return;
            }
            if (this.excessReplicateMap.contains(storage, this.getBlockInfo(block)) || cur.isDecommissionInProgress() || cur.isDecommissioned() || corruptNodes != null && corruptNodes.contains(cur)) continue;
            nonExcess.add(storage);
        }
        this.chooseExcessReplicates(nonExcess, block, replication, addedNode, delNodeHint, this.blockplacement);
    }

    private void chooseExcessReplicates(Collection<DatanodeStorageInfo> nonExcess, Block b, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint, BlockPlacementPolicy replicator) throws StorageException, TransactionContextException {
        BlockCollection bc = this.getBlockCollection(b);
        BlockStoragePolicy storagePolicy = this.storagePolicySuite.getPolicy(bc.getStoragePolicyID());
        List<StorageType> excessTypes = storagePolicy.chooseExcess(replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
        HashMap<String, List<DatanodeStorageInfo>> rackMap = new HashMap<String, List<DatanodeStorageInfo>>();
        ArrayList<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
        ArrayList<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
        replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
        boolean firstOne = true;
        DatanodeStorageInfo delNodeHintStorage = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
        DatanodeStorageInfo addedNodeStorage = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
        while (nonExcess.size() - replication > 0) {
            DatanodeStorageInfo cur = BlockManager.useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, moreThanOne, excessTypes) ? delNodeHintStorage : replicator.chooseReplicaToDelete(bc, b, replication, moreThanOne, exactlyOne, excessTypes);
            firstOne = false;
            replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur);
            nonExcess.remove(cur);
            this.addToExcessReplicate(cur, b);
            this.addToInvalidates(b, cur);
            blockLog.info((Object)("BLOCK* chooseExcessReplicates: (" + cur + ", " + b + ") is added to invalidated blocks set"));
        }
    }

    static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, List<StorageType> excessTypes) {
        if (!isFirst) {
            return false;
        }
        if (delHint == null) {
            return false;
        }
        if (!excessTypes.contains((Object)delHint.getStorageType())) {
            return false;
        }
        if (moreThan1Racks.contains(delHint)) {
            return true;
        }
        return added != null && !moreThan1Racks.contains(added);
    }

    private void addToExcessReplicate(DatanodeStorageInfo storage, Block block) throws StorageException, TransactionContextException {
        BlockInfo blockInfo = this.getBlockInfo(block);
        if (this.excessReplicateMap.put(storage.getSid(), blockInfo)) {
            this.excessBlocksCount.incrementAndGet();
            if (blockLog.isDebugEnabled()) {
                blockLog.debug((Object)("BLOCK* addToExcessReplicate: (" + storage + ", " + block + ") is added to excessReplicateMap"));
            }
        }
    }

    public void removeStoredBlock(Block block, DatanodeDescriptor node) throws IOException {
        if (blockLog.isDebugEnabled()) {
            blockLog.debug((Object)("BLOCK* removeStoredBlock: " + block + " from " + node));
        }
        if (!this.blocksMap.removeNode(block, node)) {
            if (blockLog.isDebugEnabled()) {
                blockLog.debug((Object)("BLOCK* removeStoredBlock: " + block + " has already been removed from node " + node));
            }
            return;
        }
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        if (bc != null) {
            this.namesystem.decrementSafeBlockCount(this.getBlockInfo(block));
            this.updateNeededReplications(block, -1, 0);
        }
        if (this.excessReplicateMap.remove(node, this.getBlockInfo(block))) {
            this.excessBlocksCount.decrementAndGet();
            if (blockLog.isDebugEnabled()) {
                blockLog.debug((Object)("BLOCK* removeStoredBlock: " + block + " is removed from excessBlocks"));
            }
        }
        this.corruptReplicas.removeFromCorruptReplicasMap(this.getBlockInfo(block), node);
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        if (fsNamesystem.isErasureCodingEnabled()) {
            BlockInfo blockInfo = this.getStoredBlock(block);
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
            if (status != null) {
                NumberReplicas numberReplicas = this.countNodes(block);
                if (numberReplicas.liveReplicas() == 0) {
                    if (!status.isCorrupt()) {
                        status.setStatus(EncodingStatus.Status.REPAIR_REQUESTED);
                        status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    status.setLostBlocks(Integer.valueOf(status.getLostBlocks() + 1));
                    EntityManager.update((Object)status);
                }
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
                if (status == null) {
                    LOG.info((Object)("removeStoredBlock returned null for " + blockInfo.getInodeId()));
                } else {
                    LOG.info((Object)("removeStoredBlock found " + blockInfo.getInodeId() + " with status " + status));
                }
                if (status != null) {
                    NumberReplicas numberReplicas = this.countNodes(block);
                    if (numberReplicas.liveReplicas() == 0) {
                        if (!status.isParityCorrupt()) {
                            status.setParityStatus(EncodingStatus.ParityStatus.REPAIR_REQUESTED);
                            status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                        }
                        status.setLostParityBlocks(Integer.valueOf(status.getLostParityBlocks() + 1));
                        EntityManager.update((Object)status);
                        LOG.info((Object)"removeStoredBlock updated parity status to repair requested");
                    } else {
                        LOG.info((Object)("removeStoredBlock found replicas: " + numberReplicas.liveReplicas()));
                    }
                }
            }
        }
    }

    public void removeStoredBlock(Block block, int sid) throws IOException {
        if (blockLog.isDebugEnabled()) {
            blockLog.debug((Object)("BLOCK* removeStoredBlock: " + block + " from " + sid));
        }
        if (!this.blocksMap.removeNode(block, sid)) {
            if (blockLog.isDebugEnabled()) {
                blockLog.debug((Object)("BLOCK* removeStoredBlock: " + block + " has already been removed from node " + sid));
            }
            return;
        }
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        if (bc != null) {
            this.namesystem.decrementSafeBlockCount(this.getBlockInfo(block));
            this.updateNeededReplications(block, -1, 0);
        }
        this.corruptReplicas.forceRemoveFromCorruptReplicasMap(this.getBlockInfo(block), sid);
        FSNamesystem fsNamesystem = (FSNamesystem)this.namesystem;
        if (fsNamesystem.isErasureCodingEnabled()) {
            BlockInfo blockInfo = this.getStoredBlock(block);
            EncodingStatus status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
            if (status != null) {
                NumberReplicas numberReplicas = this.countNodes(block);
                if (numberReplicas.liveReplicas() == 0) {
                    if (!status.isCorrupt()) {
                        status.setStatus(EncodingStatus.Status.REPAIR_REQUESTED);
                        status.setStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                    }
                    status.setLostBlocks(Integer.valueOf(status.getLostBlocks() + 1));
                    EntityManager.update((Object)status);
                }
            } else {
                status = (EncodingStatus)EntityManager.find((FinderType)EncodingStatus.Finder.ByParityInodeId, (Object[])new Object[]{blockInfo.getInodeId()});
                if (status == null) {
                    LOG.info((Object)("removeStoredBlock returned null for " + blockInfo.getInodeId()));
                } else {
                    LOG.info((Object)("removeStoredBlock found " + blockInfo.getInodeId() + " with status " + status));
                }
                if (status != null) {
                    NumberReplicas numberReplicas = this.countNodes(block);
                    if (numberReplicas.liveReplicas() == 0) {
                        if (!status.isParityCorrupt()) {
                            status.setParityStatus(EncodingStatus.ParityStatus.REPAIR_REQUESTED);
                            status.setParityStatusModificationTime(Long.valueOf(System.currentTimeMillis()));
                        }
                        status.setLostParityBlocks(Integer.valueOf(status.getLostParityBlocks() + 1));
                        EntityManager.update((Object)status);
                        LOG.info((Object)"removeStoredBlock updated parity status to repair requested");
                    } else {
                        LOG.info((Object)("removeStoredBlock found replicas: " + numberReplicas.liveReplicas()));
                    }
                }
            }
        }
    }

    private long addBlock(final Block block, List<BlocksWithLocations.BlockWithLocations> results) throws IOException {
        final ArrayList locations = new ArrayList();
        new HopsTransactionalRequestHandler(HDFSOperationType.GET_VALID_BLK_LOCS){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.IV));
            }

            public Object performTask() throws IOException {
                BlockInfo temp = BlockManager.this.getBlockInfo(block);
                List ms = BlockManager.this.getValidLocations(temp);
                locations.addAll(ms);
                return null;
            }
        }.handle(this.namesystem);
        if (locations.isEmpty()) {
            return 0L;
        }
        String[] datanodeUuids = new String[locations.size()];
        String[] storageIDs = new String[datanodeUuids.length];
        StorageType[] storageTypes = new StorageType[datanodeUuids.length];
        for (int i = 0; i < locations.size(); ++i) {
            DatanodeStorageInfo s = (DatanodeStorageInfo)locations.get(i);
            datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
            storageIDs[i] = s.getStorageID();
            storageTypes[i] = s.getStorageType();
        }
        results.add(new BlocksWithLocations.BlockWithLocations(block, datanodeUuids, storageIDs, storageTypes));
        return block.getNumBytes();
    }

    @VisibleForTesting
    void addBlock(DatanodeStorageInfo storage, Block block, String delHint) throws IOException {
        DatanodeDescriptor node = storage.getDatanodeDescriptor();
        node.decrementBlocksScheduled(storage.getStorageType());
        DatanodeDescriptor delHintNode = null;
        if (delHint != null && delHint.length() != 0 && (delHintNode = this.datanodeManager.getDatanodeByUuid(delHint)) == null) {
            blockLog.warn((Object)("BLOCK* blockReceived: " + block + " is expected to be removed from an unrecorded node " + delHint));
        }
        this.pendingReplications.decrement(this.getBlockInfo(block), node);
        this.processAndHandleReportedBlock(storage, block, HdfsServerConstants.ReplicaState.FINALIZED, delHintNode);
    }

    private void processAndHandleReportedBlock(DatanodeStorageInfo storage, Block block, HdfsServerConstants.ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException {
        LinkedList<BlockInfo> toAdd = new LinkedList<BlockInfo>();
        LinkedList<Block> toInvalidate = new LinkedList<Block>();
        LinkedList<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
        LinkedList<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
        DatanodeDescriptor node = storage.getDatanodeDescriptor();
        this.processIncrementallyReportedBlock(storage, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC);
        assert (toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1) : "The block should be only in one of the lists.";
        for (StatefulBlockInfo b : toUC) {
            this.addStoredBlockUnderConstruction(b, storage);
        }
        long numBlocksLogged = 0L;
        for (BlockInfo blockInfo : toAdd) {
            this.addStoredBlock(blockInfo, storage, delHintNode, numBlocksLogged < this.maxNumBlocksToLog);
            ++numBlocksLogged;
        }
        if (numBlocksLogged > this.maxNumBlocksToLog) {
            blockLog.info((Object)("BLOCK* addBlock: logged info for " + this.maxNumBlocksToLog + " of " + numBlocksLogged + " reported."));
        }
        for (Block block2 : toInvalidate) {
            blockLog.info((Object)("BLOCK* addBlock: block " + block2 + " on " + storage + " size " + block2.getNumBytes() + " does not belong to any file"));
            this.addToInvalidates(block2, storage.getDatanodeDescriptor());
        }
        for (BlockToMarkCorrupt blockToMarkCorrupt : toCorrupt) {
            this.markBlockAsCorrupt(blockToMarkCorrupt, storage, storage.getDatanodeDescriptor());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processIncrementalBlockReport(DatanodeRegistration nodeID, StorageReceivedDeletedBlocks blockInfos) throws IOException {
        final int[] received = new int[]{0};
        final int[] deleted = new int[]{0};
        int[] receiving = new int[]{0};
        final DatanodeDescriptor node = this.datanodeManager.getDatanode(nodeID);
        if (node == null || !node.isAlive) {
            blockLog.warn((Object)("BLOCK* processIncrementalBlockReport is received from dead or unregistered node " + nodeID));
            throw new IOException("Got incremental block report from unregistered or dead node");
        }
        DatanodeStorageInfo s = node.getStorageInfo(blockInfos.getStorage().getStorageID());
        if (s == null) {
            s = node.updateStorage(blockInfos.getStorage());
        }
        final DatanodeStorageInfo storage = s;
        HopsTransactionalRequestHandler processIncrementalBlockReportHandler = new HopsTransactionalRequestHandler(HDFSOperationType.BLOCK_RECEIVED_AND_DELETED_INC_BLK_REPORT){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                ReceivedDeletedBlockInfo rdbi = (ReceivedDeletedBlockInfo)this.getParams()[0];
                LOG.debug((Object)("reported block id=" + rdbi.getBlock().getBlockId()));
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(rdbi.getBlock());
                if (this.inodeIdentifier == null) {
                    LOG.error((Object)("Invalid State. deleted blk is not recognized. bid=" + rdbi.getBlock().getBlockId()));
                }
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                ReceivedDeletedBlockInfo rdbi = (ReceivedDeletedBlockInfo)this.getParams()[0];
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(rdbi.getBlock().getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR));
                if (!rdbi.isDeletedBlock()) {
                    locks.add(lf.getBlockRelated(LockFactory.BLK.PE, LockFactory.BLK.UC, LockFactory.BLK.IV));
                }
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
                locks.add(lf.getIndividualHashBucketLock(storage.getSid(), HashBuckets.getInstance().getBucketForBlock(rdbi.getBlock())));
            }

            public Object performTask() throws IOException {
                ReceivedDeletedBlockInfo rdbi = (ReceivedDeletedBlockInfo)this.getParams()[0];
                LOG.debug((Object)("BLOCK_RECEIVED_AND_DELETED_INC_BLK_REPORT " + (Object)((Object)rdbi.getStatus()) + " bid=" + rdbi.getBlock().getBlockId() + " dataNode=" + node.getXferAddr() + " storage=" + storage.getStorageID() + " sid: " + storage.getSid() + " status=" + (Object)((Object)rdbi.getStatus())));
                HashBuckets hashBuckets = HashBuckets.getInstance();
                switch (rdbi.getStatus()) {
                    case CREATING: {
                        BlockManager.this.processAndHandleReportedBlock(storage, rdbi.getBlock(), HdfsServerConstants.ReplicaState.RBW, null);
                        received[0] = received[0] + 1;
                        break;
                    }
                    case APPENDING: {
                        BlockManager.this.processAndHandleReportedBlock(storage, rdbi.getBlock(), HdfsServerConstants.ReplicaState.RBW, null);
                        received[0] = received[0] + 1;
                        break;
                    }
                    case RECOVERING_APPEND: {
                        BlockManager.this.processAndHandleReportedBlock(storage, rdbi.getBlock(), HdfsServerConstants.ReplicaState.RBW, null);
                        received[0] = received[0] + 1;
                        break;
                    }
                    case RECEIVED: {
                        BlockManager.this.addBlock(storage, rdbi.getBlock(), rdbi.getDelHints());
                        hashBuckets.applyHash(storage.getSid(), HdfsServerConstants.ReplicaState.FINALIZED, rdbi.getBlock());
                        received[0] = received[0] + 1;
                        break;
                    }
                    case UPDATE_RECOVERED: {
                        BlockManager.this.addBlock(storage, rdbi.getBlock(), rdbi.getDelHints());
                        received[0] = received[0] + 1;
                        break;
                    }
                    case DELETED: {
                        BlockManager.this.removeStoredBlock(rdbi.getBlock(), storage.getDatanodeDescriptor());
                        deleted[0] = deleted[0] + 1;
                        break;
                    }
                    default: {
                        String msg = "Unknown block status code reported by " + storage.getStorageID() + ": " + rdbi;
                        blockLog.warn((Object)msg);
                        assert (false) : msg;
                        break;
                    }
                }
                if (blockLog.isDebugEnabled()) {
                    blockLog.debug((Object)("BLOCK* block " + (Object)((Object)rdbi.getStatus()) + ": " + rdbi.getBlock() + " is received from " + storage.getStorageID()));
                }
                return null;
            }
        };
        try {
            if (node == null || !node.isAlive) {
                blockLog.warn((Object)("BLOCK* processIncrementalBlockReport is received from dead or unregistered node " + nodeID));
                throw new IOException("Got incremental block report from unregistered or dead node");
            }
            for (ReceivedDeletedBlockInfo rdbi : blockInfos.getBlocks()) {
                processIncrementalBlockReportHandler.setParams(new Object[]{rdbi});
                processIncrementalBlockReportHandler.handle(this.namesystem);
            }
        }
        finally {
            blockLog.debug((Object)("*BLOCK* NameNode.processIncrementalBlockReport: from " + nodeID + " receiving: " + receiving[0] + ",  received: " + received[0] + ",  deleted: " + deleted[0]));
        }
    }

    public NumberReplicas countNodes(Block b) throws IOException {
        int decommissioned = 0;
        int live = 0;
        int corrupt = 0;
        int excess = 0;
        int stale = 0;
        Collection<DatanodeDescriptor> nodesCorrupt = this.corruptReplicas.getNodes(this.getBlockInfo(b));
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(b, DatanodeStorage.State.NORMAL)) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
                ++corrupt;
            } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
                ++decommissioned;
            } else {
                LightWeightLinkedSet<Block> blocksExcess = this.excessReplicateMap.get(storage.getSid());
                if (blocksExcess != null && blocksExcess.contains(b)) {
                    ++excess;
                } else {
                    ++live;
                }
            }
            if (!storage.areBlockContentsStale()) continue;
            ++stale;
        }
        return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
    }

    int countLiveNodes(BlockInfo b) throws IOException {
        if (!this.namesystem.isInStartupSafeMode()) {
            return this.countNodes(b).liveReplicas();
        }
        int live = 0;
        List<DatanodeStorageInfo> storages = this.blocksMap.storageList(b, DatanodeStorage.State.NORMAL);
        Collection<DatanodeDescriptor> nodesCorrupt = this.corruptReplicas.getNodes(b);
        for (DatanodeStorageInfo storage : storages) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            if (nodesCorrupt != null && nodesCorrupt.contains(node)) continue;
            ++live;
        }
        return live;
    }

    private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode, NumberReplicas num) throws StorageException, TransactionContextException {
        int curReplicas = num.liveReplicas();
        int curExpectedReplicas = this.getReplication(block);
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        List<DatanodeStorageInfo> storages = this.blocksMap.storageList(block);
        StringBuilder nodeList = new StringBuilder();
        for (DatanodeStorageInfo storage : storages) {
            DatanodeDescriptor node = storage.getDatanodeDescriptor();
            nodeList.append(node);
            nodeList.append(" ");
        }
        LOG.info((Object)("Block: " + block + ", Expected Replicas: " + curExpectedReplicas + ", live replicas: " + curReplicas + ", corrupt replicas: " + num.corruptReplicas() + ", decommissioned replicas: " + num.decommissionedReplicas() + ", excess replicas: " + num.excessReplicas() + ", Is Open File: " + bc.isUnderConstruction() + ", Datanodes having this block: " + nodeList + ", Current Datanode: " + srcNode + ", Is current datanode decommissioning: " + srcNode.isDecommissionInProgress()));
    }

    void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) throws IOException {
        final int[] numOverReplicated = new int[]{0};
        Iterator<BlockInfo> it = srcNode.getBlockIterator();
        HopsTransactionalRequestHandler processBlockHandler = new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_OVER_REPLICATED_BLOCKS_ON_RECOMMISSION){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                Block b = (Block)this.getParams()[0];
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                Block block = (Block)this.getParams()[0];
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR, LockFactory.BLK.IV));
            }

            public Object performTask() throws IOException {
                Block block = (Block)this.getParams()[0];
                BlockCollection bc = BlockManager.this.blocksMap.getBlockCollection(block);
                short expectedReplication = bc.getBlockReplication();
                NumberReplicas num = BlockManager.this.countNodes(block);
                int numCurrentReplica = num.liveReplicas();
                if (numCurrentReplica > expectedReplication) {
                    BlockManager.this.processOverReplicatedBlock(block, expectedReplication, null, null);
                    numOverReplicated[0] = numOverReplicated[0] + 1;
                }
                return null;
            }
        };
        while (it.hasNext()) {
            processBlockHandler.setParams(new Object[]{it.next()}).handle((Object)this.namesystem);
        }
        LOG.info((Object)("Invalidated " + numOverReplicated[0] + " over-replicated blocks on " + srcNode + " during recommissioning"));
    }

    boolean isReplicationInProgress(final DatanodeDescriptor srcNode) throws IOException {
        final boolean[] status = new boolean[]{false};
        final boolean[] firstReplicationLog = new boolean[]{true};
        final int[] underReplicatedBlocks = new int[]{0};
        final int[] decommissionOnlyReplicas = new int[]{0};
        final int[] underReplicatedInOpenFiles = new int[]{0};
        Iterator<BlockInfo> it = srcNode.getBlockIterator();
        HopsTransactionalRequestHandler checkReplicationHandler = new HopsTransactionalRequestHandler(HDFSOperationType.CHECK_REPLICATION_IN_PROGRESS){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                Block b = (Block)this.getParams()[0];
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                Block block = (Block)this.getParams()[0];
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getBlockLock()).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.PE));
            }

            public Object performTask() throws IOException {
                Block block = (Block)this.getParams()[0];
                BlockCollection bc = BlockManager.this.blocksMap.getBlockCollection(block);
                if (bc != null) {
                    NumberReplicas num = BlockManager.this.countNodes(block);
                    int curReplicas = num.liveReplicas();
                    int curExpectedReplicas = BlockManager.this.getReplication(block);
                    if (BlockManager.this.isNeededReplication(block, curExpectedReplicas, curReplicas)) {
                        if (curExpectedReplicas > curReplicas) {
                            if (bc.isUnderConstruction()) {
                                if (block.equals(bc.getLastBlock()) && curReplicas > BlockManager.this.minReplication) {
                                    return null;
                                }
                                underReplicatedInOpenFiles[0] = underReplicatedInOpenFiles[0] + 1;
                            }
                            if (!status[0]) {
                                status[0] = true;
                                if (firstReplicationLog[0]) {
                                    BlockManager.this.logBlockReplicationInfo(block, srcNode, num);
                                }
                                if (curReplicas >= BlockManager.this.defaultReplication) {
                                    status[0] = false;
                                    firstReplicationLog[0] = false;
                                }
                            }
                            underReplicatedBlocks[0] = underReplicatedBlocks[0] + 1;
                            if (curReplicas == 0 && num.decommissionedReplicas() > 0) {
                                decommissionOnlyReplicas[0] = decommissionOnlyReplicas[0] + 1;
                            }
                        }
                        if (!BlockManager.this.neededReplications.contains(BlockManager.this.getBlockInfo(block)) && BlockManager.this.pendingReplications.getNumReplicas(BlockManager.this.getBlockInfo(block)) == 0) {
                            BlockManager.this.neededReplications.add(BlockManager.this.getBlockInfo(block), curReplicas, num.decommissionedReplicas(), curExpectedReplicas);
                        }
                    }
                }
                return null;
            }
        };
        while (it.hasNext()) {
            checkReplicationHandler.setParams(new Object[]{it.next()});
            checkReplicationHandler.handle(this.namesystem);
        }
        srcNode.decommissioningStatus.set(underReplicatedBlocks[0], decommissionOnlyReplicas[0], underReplicatedInOpenFiles[0]);
        return status[0];
    }

    public int getActiveBlockCount() throws IOException {
        return this.blocksMap.size();
    }

    public DatanodeStorageInfo[] getStorages(BlockInfo block) throws TransactionContextException, StorageException {
        return block.getStorages(this.datanodeManager);
    }

    public int getTotalBlocks() throws IOException {
        return this.blocksMap.size();
    }

    public void removeBlock(Block block) throws StorageException, TransactionContextException {
        block.setNumBytesNoPersistance(Long.MAX_VALUE);
        this.addToInvalidates(block);
        this.corruptReplicas.removeFromCorruptReplicasMap(this.getBlockInfo(block));
        BlockInfo storedBlock = this.getBlockInfo(block);
        this.blocksMap.removeBlock(block);
        this.pendingReplications.remove(storedBlock);
        this.neededReplications.remove(storedBlock, 5);
        if (this.postponedMisreplicatedBlocks.remove(block)) {
            this.postponedMisreplicatedBlocksCount.decrementAndGet();
        }
    }

    public BlockInfo getStoredBlock(Block block) throws StorageException, TransactionContextException {
        return this.blocksMap.getStoredBlock(block);
    }

    private void updateNeededReplications(Block block, int curReplicasDelta, int expectedReplicasDelta) throws IOException {
        if (!this.namesystem.isPopulatingReplQueues()) {
            return;
        }
        NumberReplicas repl = this.countNodes(block);
        int curExpectedReplicas = this.getReplication(block);
        if (this.isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
            this.neededReplications.update(this.getBlockInfo(block), repl.liveReplicas(), repl.decommissionedReplicas(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
        } else {
            int oldReplicas = repl.liveReplicas() - curReplicasDelta;
            int oldExpectedReplicas = curExpectedReplicas - expectedReplicasDelta;
            this.neededReplications.remove(this.getBlockInfo(block), oldReplicas, repl.decommissionedReplicas(), oldExpectedReplicas);
        }
    }

    public void checkReplication(BlockCollection bc) throws IOException {
        short expected = bc.getBlockReplication();
        for (BlockInfo block : bc.getBlocks()) {
            NumberReplicas n = this.countNodes(block);
            if (this.isNeededReplication(block, expected, n.liveReplicas())) {
                this.neededReplications.add(this.getBlockInfo(block), n.liveReplicas(), n.decommissionedReplicas(), expected);
                continue;
            }
            if (n.liveReplicas() <= expected) continue;
            this.processOverReplicatedBlock(block, expected, null, null);
        }
    }

    private int getReplication(Block block) throws StorageException, TransactionContextException {
        BlockCollection bc = this.blocksMap.getBlockCollection(block);
        return bc == null ? (short)0 : bc.getBlockReplication();
    }

    private int invalidateWorkForOneNode(DatanodeInfo dn) throws IOException {
        if (this.namesystem.isInSafeMode()) {
            LOG.debug((Object)"In safemode, not computing replication work");
            return 0;
        }
        assert (dn != null);
        DatanodeDescriptor dnDescriptor = this.datanodeManager.getDatanode(dn);
        if (dnDescriptor == null) {
            LOG.warn((Object)("DataNode " + dn + " cannot be found with UUID " + dn.getDatanodeUuid() + ", removing block invalidation work."));
            List<Integer> sids = this.datanodeManager.getSidsOnDatanode(dn.getDatanodeUuid());
            this.invalidateBlocks.remove(sids);
            return 0;
        }
        List<Block> toInvalidate = this.invalidateBlocks.invalidateWork(dnDescriptor);
        if (toInvalidate == null) {
            return 0;
        }
        if (blockLog.isInfoEnabled()) {
            blockLog.info((Object)("BLOCK* " + this.getClass().getSimpleName() + ": ask " + dn + " to delete " + toInvalidate));
        }
        return toInvalidate.size();
    }

    boolean blockHasEnoughRacks(Block b) throws StorageException, TransactionContextException {
        if (!this.shouldCheckForEnoughRacks) {
            return true;
        }
        boolean enoughRacks = false;
        Collection<DatanodeDescriptor> corruptNodes = this.corruptReplicas.getNodes(this.getBlockInfo(b));
        int numExpectedReplicas = this.getReplication(b);
        String rackName = null;
        for (DatanodeStorageInfo storage : this.blocksMap.storageList(b)) {
            DatanodeDescriptor cur = storage.getDatanodeDescriptor();
            if (cur.isDecommissionInProgress() || cur.isDecommissioned() || corruptNodes != null && corruptNodes.contains(cur)) continue;
            if (numExpectedReplicas == 1 || numExpectedReplicas > 1 && !this.datanodeManager.hasClusterEverBeenMultiRack()) {
                enoughRacks = true;
                break;
            }
            String rackNameNew = cur.getNetworkLocation();
            if (rackName == null) {
                rackName = rackNameNew;
                continue;
            }
            if (rackName.equals(rackNameNew)) continue;
            enoughRacks = true;
            break;
        }
        return enoughRacks;
    }

    private boolean isNeededReplication(Block b, int expected, int current) throws StorageException, TransactionContextException {
        return current < expected || !this.blockHasEnoughRacks(b);
    }

    public long getMissingBlocksCount() throws IOException {
        return this.neededReplications.getCorruptBlockSize();
    }

    public BlockInfo addBlockCollection(BlockInfo block, BlockCollection bc) throws StorageException, TransactionContextException {
        return this.blocksMap.addBlockCollection(block, bc);
    }

    public BlockCollection getBlockCollection(Block b) throws StorageException, TransactionContextException {
        return this.blocksMap.getBlockCollection(b);
    }

    public List<DatanodeStorageInfo> storageList(Block block) throws StorageException, TransactionContextException {
        return this.blocksMap.storageList(block);
    }

    public int numCorruptReplicas(Block block) throws StorageException, TransactionContextException {
        return this.corruptReplicas.numCorruptReplicas(this.getBlockInfo(block));
    }

    public void removeBlockFromMap(Block block) throws StorageException, TransactionContextException {
        this.corruptReplicas.removeFromCorruptReplicasMap(this.getBlockInfo(block));
        this.blocksMap.removeBlock(block);
    }

    public int getCapacity() {
        return this.blocksMap.getCapacity();
    }

    public long[] getCorruptReplicaBlockIds(int numExpectedBlocks, Long startingBlockId) throws IOException {
        return this.corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks, startingBlockId);
    }

    public Iterator<Block> getCorruptReplicaBlockIterator() {
        return this.neededReplications.iterator(4);
    }

    public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) throws StorageException, TransactionContextException {
        return this.corruptReplicas.getNodes(this.getBlockInfo(block));
    }

    public int numOfUnderReplicatedBlocks() throws IOException {
        return this.neededReplications.size();
    }

    int computeDatanodeWork() throws IOException {
        if (this.namesystem.isInSafeMode()) {
            return 0;
        }
        int numlive = this.heartbeatManager.getLiveDatanodeCount();
        int blocksToProcess = numlive * this.blocksReplWorkMultiplier;
        int nodesToProcess = (int)Math.ceil((float)numlive * this.blocksInvalidateWorkPct);
        int workFound = this.computeReplicationWork(blocksToProcess);
        this.updateState();
        this.scheduledReplicationBlocksCount = workFound;
        return workFound += this.computeInvalidateWork(nodesToProcess);
    }

    public void clearQueues() throws IOException {
        this.neededReplications.clear();
        this.pendingReplications.clear();
        this.excessReplicateMap.clear();
        this.invalidateBlocks.clear();
        this.datanodeManager.clearPendingQueues();
    }

    private void removeStoredBlocksTx(final List<Integer> inodeIds, final Map<Integer, List<Long>> inodeIdsToBlockMap, final DatanodeDescriptor node) throws IOException {
        final AtomicInteger removedBlocks = new AtomicInteger(0);
        new HopsTransactionalRequestHandler(HDFSOperationType.REMOVE_STORED_BLOCKS){
            List<INodeIdentifier> inodeIdentifiers;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getBatchedINodesLock(this.inodeIdentifiers)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifiers != null) {
                    locks.add(lf.getBatchedEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifiers));
                }
            }

            public Object performTask() throws IOException {
                for (INodeIdentifier identifier : this.inodeIdentifiers) {
                    Iterator iterator = ((List)inodeIdsToBlockMap.get(identifier.getInodeId())).iterator();
                    while (iterator.hasNext()) {
                        long blockId = (Long)iterator.next();
                        BlockInfo block = (BlockInfo)EntityManager.find((FinderType)BlockInfo.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{blockId});
                        BlockManager.this.removeStoredBlock((Block)block, node);
                        removedBlocks.incrementAndGet();
                    }
                }
                return null;
            }
        }.handle(this.namesystem);
        LOG.info((Object)("removed " + removedBlocks.get() + " replicas from " + node.getName()));
    }

    private void removeStoredBlocksTx(final List<Integer> inodeIds, final Map<Integer, List<Long>> inodeIdsToBlockMap, final int sid) throws IOException {
        final AtomicInteger removedBlocks = new AtomicInteger(0);
        new HopsTransactionalRequestHandler(HDFSOperationType.REMOVE_STORED_BLOCKS){
            List<INodeIdentifier> inodeIdentifiers;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifiers = INodeUtil.resolveINodesFromIds(inodeIds);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getBatchedINodesLock(this.inodeIdentifiers)).add(lf.getSqlBatchedBlocksLock()).add(lf.getSqlBatchedBlocksRelated(LockFactory.BLK.RE, LockFactory.BLK.IV, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.ER));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifiers != null) {
                    locks.add(lf.getBatchedEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifiers));
                }
            }

            public Object performTask() throws IOException {
                for (INodeIdentifier identifier : this.inodeIdentifiers) {
                    Iterator iterator = ((List)inodeIdsToBlockMap.get(identifier.getInodeId())).iterator();
                    while (iterator.hasNext()) {
                        long blockId = (Long)iterator.next();
                        BlockInfo block = (BlockInfo)EntityManager.find((FinderType)BlockInfo.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{blockId});
                        BlockManager.this.removeStoredBlock((Block)block, sid);
                        removedBlocks.incrementAndGet();
                    }
                }
                return null;
            }
        }.handle(this.namesystem);
        LOG.info((Object)("removed " + removedBlocks.get() + " replicas from " + sid));
    }

    @VisibleForTesting
    int computeReplicationWorkForBlock(final Block b, final int priority) throws IOException {
        return (Integer)new HopsTransactionalRequestHandler(HDFSOperationType.COMPUTE_REPLICATION_WORK_FOR_BLOCK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getBlockLock(b.getBlockId(), this.inodeIdentifier)).add(lf.getVariableLock(Variable.Finder.ReplicationIndex, TransactionLockTypes.LockType.WRITE)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR, LockFactory.BLK.UC));
            }

            public Object performTask() throws IOException {
                return BlockManager.this.computeReplicationWorkForBlockInternal(b, priority);
            }
        }.handle(this.namesystem);
    }

    public BlockInfo tryToCompleteBlock(BlockCollection bc, int blkIndex) throws IOException {
        if (blkIndex < 0) {
            return null;
        }
        BlockInfo curBlock = bc.getBlock(blkIndex);
        LOG.debug((Object)("tryToCompleteBlock. blkId = " + curBlock.getBlockId()));
        if (curBlock.isComplete()) {
            return curBlock;
        }
        BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
        int numNodes = ucBlock.numNodes(this.datanodeManager);
        if (numNodes < this.minReplication) {
            return null;
        }
        if (ucBlock.getBlockUCState() != HdfsServerConstants.BlockUCState.COMMITTED) {
            return null;
        }
        BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
        bc.setBlock(blkIndex, completeBlock);
        this.namesystem.adjustSafeModeBlockTotals(0, 1);
        this.namesystem.incrementSafeBlockCount(curBlock);
        return completeBlock;
    }

    private void processTimedOutPendingBlock(final long timedOutItemId) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.PROCESS_TIMEDOUT_PENDING_BLOCK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlockID(timedOutItemId);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(timedOutItemId, this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockInfo timedOutItem = (BlockInfo)EntityManager.find((FinderType)BlockInfo.Finder.ByBlockIdAndINodeId, (Object[])new Object[]{timedOutItemId});
                NumberReplicas num = BlockManager.this.countNodes(timedOutItem);
                if (BlockManager.this.isNeededReplication(timedOutItem, BlockManager.this.getReplication(timedOutItem), num.liveReplicas())) {
                    BlockManager.this.neededReplications.add(BlockManager.this.getBlockInfo(timedOutItem), num.liveReplicas(), num.decommissionedReplicas(), BlockManager.this.getReplication(timedOutItem));
                }
                BlockManager.this.pendingReplications.remove(timedOutItem);
                return null;
            }
        }.handle(this.namesystem);
    }

    private BlockInfo getBlockInfo(Block b) throws StorageException, TransactionContextException {
        BlockInfo binfo = this.blocksMap.getStoredBlock(b);
        if (binfo == null) {
            LOG.error((Object)("ERROR: Dangling Block. bid=" + b.getBlockId() + " setting inodeId to be " + BlockInfo.NON_EXISTING_ID));
            binfo = new BlockInfo(b, BlockInfo.NON_EXISTING_ID);
        }
        return binfo;
    }

    private Block addStoredBlockTx(final BlockInfo block, final DatanodeStorageInfo storage, final DatanodeDescriptor delNodeHint, final boolean logEveryBlock) throws IOException {
        return (Block)new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_BLK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.IV, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                return BlockManager.this.addStoredBlock(block, storage, delNodeHint, logEveryBlock);
            }
        }.handle();
    }

    private void addStoredBlockUnderConstructionTx(final StatefulBlockInfo ucBlock, final DatanodeStorageInfo storage) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_UC_BLK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(ucBlock.reportedBlock);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(ucBlock.reportedBlock.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.UC, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.PE, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockManager.this.addStoredBlockUnderConstruction(ucBlock, storage);
                return null;
            }
        }.handle();
    }

    private void addToInvalidates(Collection<Block> blocks, DatanodeStorageInfo storage) throws IOException {
        this.invalidateBlocks.add(blocks, storage);
    }

    private void markBlockAsCorruptTx(final BlockToMarkCorrupt b, final DatanodeStorageInfo storage) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_CORRUPT_BLK){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(b.corrupted);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier)).add(lf.getIndividualBlockLock(b.corrupted.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.ER, LockFactory.BLK.CR, LockFactory.BLK.UR, LockFactory.BLK.UC, LockFactory.BLK.IV));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockManager.this.markBlockAsCorrupt(b, storage, storage.getDatanodeDescriptor());
                return null;
            }
        }.handle();
    }

    public int getTotalCompleteBlocks() throws IOException {
        return this.blocksMap.sizeCompleteOnly();
    }

    private void addStoredBlockUnderConstructionImmediateTx(final BlockInfoUnderConstruction block, final DatanodeStorageInfo storage, final HdfsServerConstants.ReplicaState reportedState) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_UC_BLK_IMMEDIATE){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.UC, LockFactory.BLK.CR, LockFactory.BLK.ER, LockFactory.BLK.PE, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                block.addReplicaIfNotPresent(storage, reportedState, block.getGenerationStamp());
                if (reportedState == HdfsServerConstants.ReplicaState.FINALIZED) {
                    BlockManager.this.addStoredBlockImmediate(block, storage, false);
                }
                return null;
            }
        }.handle();
    }

    private void addStoredBlockImmediateTx(final BlockInfo block, final DatanodeStorageInfo storage, final boolean logEveryBlock) throws IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.AFTER_PROCESS_REPORT_ADD_BLK_IMMEDIATE){
            INodeIdentifier inodeIdentifier;

            @Override
            public void setUp() throws StorageException {
                this.inodeIdentifier = INodeUtil.resolveINodeFromBlock(block);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.inodeIdentifier, true)).add(lf.getIndividualBlockLock(block.getBlockId(), this.inodeIdentifier)).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.CR, LockFactory.BLK.ER, LockFactory.BLK.PE, LockFactory.BLK.IV, LockFactory.BLK.UR));
                if (((FSNamesystem)BlockManager.this.namesystem).isErasureCodingEnabled() && this.inodeIdentifier != null) {
                    locks.add(lf.getIndivdualEncodingStatusLock(TransactionLockTypes.LockType.WRITE, this.inodeIdentifier.getInodeId()));
                }
            }

            public Object performTask() throws IOException {
                BlockManager.this.addStoredBlockImmediate(block, storage, logEveryBlock);
                return null;
            }
        }.handle();
    }

    public void shutdown() {
        this.stopReplicationInitializer();
    }

    static enum MisReplicationResult {
        INVALID,
        UNDER_REPLICATED,
        OVER_REPLICATED,
        POSTPONE,
        UNDER_CONSTRUCTION,
        OK;

    }

    private static class ReplicationWork {
        private final Block block;
        private final BlockCollection bc;
        private final DatanodeDescriptor srcNode;
        private final List<DatanodeDescriptor> containingNodes;
        private final List<DatanodeStorageInfo> liveReplicaStorages;
        private final int additionalReplRequired;
        private DatanodeStorageInfo[] targets;
        private final int priority;

        public ReplicationWork(Block block, BlockCollection bc, DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes, List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, int priority) {
            this.block = block;
            this.bc = bc;
            this.srcNode = srcNode;
            this.srcNode.incrementPendingReplicationWithoutTargets();
            this.containingNodes = containingNodes;
            this.liveReplicaStorages = liveReplicaStorages;
            this.additionalReplRequired = additionalReplRequired;
            this.priority = priority;
            this.targets = null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set<Node> excludedNodes) throws TransactionContextException, StorageException {
            try {
                this.targets = blockplacement.chooseTarget(null, this.additionalReplRequired, this.srcNode, this.liveReplicaStorages, false, excludedNodes, this.block.getNumBytes(), storagePolicySuite.getPolicy(this.bc.getStoragePolicyID()));
            }
            finally {
                this.srcNode.decrementPendingReplicationWithoutTargets();
            }
        }

        static /* synthetic */ DatanodeStorageInfo[] access$502(ReplicationWork x0, DatanodeStorageInfo[] x1) {
            x0.targets = x1;
            return x1;
        }
    }

    private class ReplicationMonitor
    implements Runnable {
        private ReplicationMonitor() {
        }

        @Override
        public void run() {
            while (BlockManager.this.namesystem.isRunning()) {
                try {
                    if (BlockManager.this.namesystem.isLeader()) {
                        LOG.debug((Object)"Running replication monitor");
                        BlockManager.this.computeDatanodeWork();
                        BlockManager.this.processPendingReplications();
                    } else {
                        LOG.debug((Object)"Namesystem is not leader: will not run replication monitor");
                    }
                    Thread.sleep(BlockManager.this.replicationRecheckInterval);
                }
                catch (Throwable t) {
                    if (t instanceof TransientStorageException) continue;
                    if (!BlockManager.this.namesystem.isRunning()) {
                        LOG.info((Object)"Stopping ReplicationMonitor.");
                        if (t instanceof InterruptedException) break;
                        LOG.info((Object)"ReplicationMonitor received an exception while shutting down.", t);
                        break;
                    }
                    if (!BlockManager.this.checkNSRunning && t instanceof InterruptedException) {
                        LOG.info((Object)"Stopping ReplicationMonitor for testing.");
                        break;
                    }
                    LOG.fatal((Object)"ReplicationMonitor thread received Runtime exception. ", t);
                    ExitUtil.terminate((int)1, (Throwable)t);
                }
            }
        }
    }

    public class ReportStatistics {
        int numBuckets;
        public int numBucketsMatching;
        int numBlocks;
        int numToRemove;
        int numToInvalidate;
        int numToCorrupt;
        int numToUC;
        int numToAdd;
        int numConsideredSafeIfInSafemode;

        public String toString() {
            return String.format("(buckets,bucketsMatching,blocks,toRemove,toInvalidate,toCorrupt,toUC,toAdd,safeBlocksIfSafeMode)=(%d,%d,%d,%d,%d,%d,%d,%d,%d)", this.numBuckets, this.numBucketsMatching, this.numBlocks, this.numToRemove, this.numToInvalidate, this.numToCorrupt, this.numToUC, this.numToAdd, this.numConsideredSafeIfInSafemode);
        }
    }

    private static class HashMatchingResult {
        private final List<Integer> matchingBuckets;
        private final List<Integer> mismatchedBuckets;

        HashMatchingResult(List<Integer> matchingBuckets, List<Integer> mismatchedBuckets) {
            this.matchingBuckets = matchingBuckets;
            this.mismatchedBuckets = mismatchedBuckets;
        }
    }

    private static class BlockToMarkCorrupt {
        final BlockInfo corrupted;
        final BlockInfo stored;
        final String reason;
        final CorruptReplicasMap.Reason reasonCode;

        BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason, CorruptReplicasMap.Reason reasonCode) {
            Preconditions.checkNotNull((Object)corrupted, (Object)"corrupted is null");
            Preconditions.checkNotNull((Object)stored, (Object)"stored is null");
            this.corrupted = corrupted;
            this.stored = stored;
            this.reason = reason;
            this.reasonCode = reasonCode;
        }

        BlockToMarkCorrupt(BlockInfo stored, String reason, CorruptReplicasMap.Reason reasonCode) {
            this(stored, stored, reason, reasonCode);
        }

        BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, CorruptReplicasMap.Reason reasonCode) {
            this(new BlockInfo(stored), stored, reason, reasonCode);
            this.corrupted.setGenerationStampNoPersistance(gs);
        }

        public String toString() {
            return this.corrupted + "(" + (this.corrupted == this.stored ? "same as stored" : "stored=" + this.stored) + ")";
        }
    }

    static class StatefulBlockInfo {
        final BlockInfoUnderConstruction storedBlock;
        final Block reportedBlock;
        final HdfsServerConstants.ReplicaState reportedState;

        StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, Block reportedBlock, HdfsServerConstants.ReplicaState reportedState) {
            this.storedBlock = storedBlock;
            this.reportedBlock = reportedBlock;
            this.reportedState = reportedState;
        }
    }
}

