/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import io.hops.leader_election.node.ActiveNode;
import io.hops.leader_election.node.ActiveNodePBImpl;
import io.hops.leader_election.node.SortedActiveNodeList;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExceptionCheck;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BRLoadBalancingNonLeaderException;
import org.apache.hadoop.hdfs.server.blockmanagement.BRLoadBalancingOverloadException;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.Bucket;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HashesMismatchCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.sparkproject.guava.annotations.VisibleForTesting;
import org.sparkproject.guava.base.Joiner;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.collect.Lists;
import org.sparkproject.guava.collect.Maps;
import org.sparkproject.guava.collect.Sets;

@InterfaceAudience.Private
class BPOfferService
implements Runnable {
    static final Log LOG = DataNode.LOG;
    NamespaceInfo bpNSInfo;
    volatile DatanodeRegistration bpRegistration;
    private final DataNode dn;
    private BPServiceActor bpServiceToActive = null;
    private List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();
    private long lastActiveClaimTxId = -1L;
    private final ReentrantReadWriteLock mReadWriteLock = new ReentrantReadWriteLock();
    private final Lock mReadLock = this.mReadWriteLock.readLock();
    private final Lock mWriteLock = this.mReadWriteLock.writeLock();
    private final DNConf dnConf;
    private volatile boolean sendImmediateIBR = false;
    volatile long lastCacheReport = 0L;
    private final Scheduler scheduler;
    private BPServiceActor blkReportHander = null;
    private List<ActiveNode> nnList = Collections.synchronizedList(new ArrayList());
    private List<InetSocketAddress> blackListNN = Collections.synchronizedList(new ArrayList());
    private AtomicInteger rpcRoundRobinIndex = new AtomicInteger(0);
    private AtomicInteger refreshNNRoundRobinIndex = new AtomicInteger(0);
    private final Map<DatanodeStorage, PerStoragePendingIncrementalBR> pendingIncrementalBRperStorage = Maps.newHashMap();
    private Thread blockReportThread = null;
    private Random rand = new Random(System.currentTimeMillis());
    private long prevBlockReportId;
    long nnListLastUpdate = 0L;

    void readLock() {
        this.mReadLock.lock();
    }

    void readUnlock() {
        this.mReadLock.unlock();
    }

    void writeLock() {
        this.mWriteLock.lock();
    }

    void writeUnlock() {
        this.mWriteLock.unlock();
    }

    BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
        Preconditions.checkArgument((!nnAddrs.isEmpty() ? 1 : 0) != 0, (Object)"Must pass at least one NN.");
        this.dn = dn;
        for (InetSocketAddress addr : nnAddrs) {
            this.bpServices.add(new BPServiceActor(addr, this));
            this.nnList.add((ActiveNode)new ActiveNodePBImpl(0L, "", addr.getAddress().getHostAddress(), addr.getPort(), "", addr.getAddress().getHostAddress(), addr.getPort()));
        }
        this.dnConf = dn.getDnConf();
        this.prevBlockReportId = DFSUtil.getRandom().nextLong();
        this.scheduler = new Scheduler(this.dnConf.heartBeatInterval, this.dnConf.blockReportInterval);
    }

    void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
        HashSet oldAddrs = Sets.newHashSet();
        for (BPServiceActor actor : this.bpServices) {
            oldAddrs.add(actor.getNNSocketAddress());
        }
        HashSet newAddrs = Sets.newHashSet(addrs);
        Sets.SetView deadNNs = Sets.difference((Set)oldAddrs, (Set)newAddrs);
        Sets.SetView newNNs = Sets.difference((Set)newAddrs, (Set)oldAddrs);
        if (deadNNs.size() != 0) {
            for (InetSocketAddress deadNN : deadNNs) {
                BPServiceActor deadActor = this.stopAnActor(deadNN);
                this.bpServices.remove(deadActor);
                LOG.debug((Object)("Stopped actor for " + deadActor.getNNSocketAddress()));
            }
        }
        if (newNNs.size() != 0) {
            for (InetSocketAddress newNN : newNNs) {
                BPServiceActor newActor = this.startAnActor(newNN);
                this.bpServices.add(newActor);
                LOG.debug((Object)("Started actor for " + newActor.getNNSocketAddress()));
            }
        }
    }

    boolean isInitialized() {
        return this.bpRegistration != null;
    }

    boolean isAlive() {
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.isAlive()) continue;
            return true;
        }
        return false;
    }

    String getBlockPoolId() {
        this.readLock();
        try {
            if (this.bpNSInfo != null) {
                String string = this.bpNSInfo.getBlockPoolID();
                return string;
            }
            LOG.warn((Object)"Block pool ID needed, but service not yet registered with NN", (Throwable)new Exception("trace"));
            String string = null;
            return string;
        }
        finally {
            this.readUnlock();
        }
    }

    boolean hasBlockPoolId() {
        return this.getNamespaceInfo() != null;
    }

    NamespaceInfo getNamespaceInfo() {
        this.readLock();
        try {
            NamespaceInfo namespaceInfo = this.bpNSInfo;
            return namespaceInfo;
        }
        finally {
            this.readUnlock();
        }
    }

    public String toString() {
        this.readLock();
        try {
            if (this.bpNSInfo == null) {
                String datanodeUuid = this.dn.getDatanodeUuid();
                if (datanodeUuid == null || datanodeUuid.isEmpty()) {
                    datanodeUuid = "unassigned";
                }
                String string = "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
                return string;
            }
            String string = "Block pool " + this.getBlockPoolId() + " (Datanode Uuid " + this.dn.getDatanodeUuid() + ")";
            return string;
        }
        finally {
            this.readUnlock();
        }
    }

    void reportBadBlocks(ExtendedBlock block, String storageUuid, StorageType storageType) {
        this.checkBlock(block);
        try {
            this.reportBadBlocksWithRetry(block, storageUuid, storageType);
        }
        catch (Exception e) {
            LOG.error((Object)"Failed to send bad block report to any namenode ", (Throwable)e);
        }
    }

    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, delHint);
        this.notifyNamenodeBlockInt(bInfo, storageUuid, true);
    }

    private void checkBlock(ExtendedBlock block) {
        Preconditions.checkArgument((block != null ? 1 : 0) != 0, (Object)"block is null");
        Preconditions.checkArgument((boolean)block.getBlockPoolId().equals(this.getBlockPoolId()), (String)"block belongs to BP %s instead of BP %s", (Object[])new Object[]{block.getBlockPoolId(), this.getBlockPoolId()});
    }

    void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
        this.notifyNamenodeDeletedBlockInt(bInfo, this.dn.getFSDataset().getStorage(storageUuid));
    }

    public void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, null);
        this.notifyNamenodeBlockInt(bInfo, storageUuid, false);
    }

    public void notifyNamenodeAppendingBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.APPENDING, null);
        this.notifyNamenodeBlockInt(bInfo, storageUuid, false);
    }

    public void notifyNamenodeAppendingRecoveredAppend(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECOVERING_APPEND, null);
        this.notifyNamenodeBlockInt(bInfo, storageUuid, true);
    }

    public void notifyNamenodeUpdateRecoveredBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.UPDATE_RECOVERED, null);
        this.notifyNamenodeBlockInt(bInfo, storageUuid, true);
    }

    void start() {
        for (BPServiceActor actor : this.bpServices) {
            actor.start();
        }
    }

    void stop() {
        for (BPServiceActor actor : this.bpServices) {
            actor.stop();
        }
    }

    void join() {
        for (BPServiceActor actor : this.bpServices) {
            actor.join();
        }
    }

    DataNode getDataNode() {
        return this.dn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
        block8: {
            this.writeLock();
            try {
                if (this.bpNSInfo == null) {
                    this.bpNSInfo = nsInfo;
                    boolean success = false;
                    try {
                        this.dn.initBlockPool(this);
                        success = true;
                        break block8;
                    }
                    finally {
                        if (!success) {
                            this.bpNSInfo = null;
                        }
                    }
                }
                BPOfferService.checkNSEquality(this.bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID");
                BPOfferService.checkNSEquality(this.bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID");
                BPOfferService.checkNSEquality(this.bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID");
            }
            finally {
                this.writeUnlock();
            }
        }
    }

    void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException {
        this.writeLock();
        try {
            if (this.bpRegistration != null) {
                BPOfferService.checkNSEquality(this.bpRegistration.getStorageInfo().getNamespaceID(), reg.getStorageInfo().getNamespaceID(), "namespace ID");
                BPOfferService.checkNSEquality(this.bpRegistration.getStorageInfo().getClusterID(), reg.getStorageInfo().getClusterID(), "cluster ID");
            } else {
                this.bpRegistration = reg;
            }
            this.dn.bpRegistrationSucceeded(this.bpRegistration, this.getBlockPoolId());
            if (this.dn.isBlockTokenEnabled) {
                this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), reg.getExportedKeys());
            }
        }
        finally {
            this.writeUnlock();
        }
    }

    private static void checkNSEquality(Object ourID, Object theirID, String idHelpText) throws IOException {
        if (!ourID.equals(theirID)) {
            throw new IOException(idHelpText + " mismatch: previously connected to " + idHelpText + " " + ourID + " but now connected to " + idHelpText + " " + theirID);
        }
    }

    DatanodeRegistration createRegistration() {
        this.writeLock();
        try {
            Preconditions.checkState((this.bpNSInfo != null ? 1 : 0) != 0, (Object)"getRegistration() can only be called after initial handshake");
            DatanodeRegistration datanodeRegistration = this.dn.createBPRegistration(this.bpNSInfo);
            return datanodeRegistration;
        }
        finally {
            this.writeUnlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void shutdownActor(BPServiceActor actor) {
        this.writeLock();
        try {
            if (this.bpServiceToActive == actor) {
                this.bpServiceToActive = null;
            }
            this.bpServices.remove(actor);
            for (ActiveNode ann : this.nnList) {
                if (!ann.getRpcServerAddressForDatanodes().equals(actor.getNNSocketAddress())) continue;
                this.nnList.remove(ann);
                break;
            }
            if (this.bpServices.isEmpty()) {
                this.dn.shutdownBlockPool(this);
            }
        }
        finally {
            this.writeUnlock();
        }
    }

    void trySendErrorReport(int errCode, String errMsg) {
        for (BPServiceActor actor : this.bpServices) {
            actor.trySendErrorReport(errCode, errMsg);
        }
    }

    void scheduleBlockReport(long delay) {
        this.scheduler.scheduleBlockReport(delay);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean otherActorsConnectedToNNs(BPServiceActor skip) {
        try {
            this.readLock();
            for (BPServiceActor actor : this.bpServices) {
                if (actor == skip || !actor.connectedToNN()) continue;
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.readUnlock();
        }
    }

    void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
        try {
            this.reportRemoteBadBlockWithRetry(dnInfo, block);
        }
        catch (IOException e) {
            LOG.warn((Object)("Couldn't report bad block " + block + "" + e));
        }
    }

    DatanodeProtocolClientSideTranslatorPB getActiveNN() {
        this.readLock();
        try {
            if (this.bpServiceToActive != null) {
                DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB = this.bpServiceToActive.bpNamenode;
                return datanodeProtocolClientSideTranslatorPB;
            }
            DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB = null;
            return datanodeProtocolClientSideTranslatorPB;
        }
        finally {
            this.readUnlock();
        }
    }

    @VisibleForTesting
    List<BPServiceActor> getBPServiceActors() {
        return Lists.newArrayList(this.bpServices);
    }

    void signalRollingUpgrade(boolean inProgress) throws IOException {
        String bpid = this.getBlockPoolId();
        if (inProgress) {
            this.dn.getFSDataset().enableTrash(bpid);
            this.dn.getFSDataset().setRollingUpgradeMarker(bpid);
        } else {
            this.dn.getFSDataset().restoreTrash(bpid);
            this.dn.getFSDataset().clearRollingUpgradeMarker(bpid);
        }
    }

    boolean containsNN(InetSocketAddress addr) {
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.getNNSocketAddress().equals(addr)) continue;
            return true;
        }
        return false;
    }

    @VisibleForTesting
    int countNameNodes() {
        return this.bpServices.size();
    }

    @VisibleForTesting
    void triggerDeletionReportForTests() throws IOException {
        this.triggerDeletionReportForTestsInt();
    }

    @VisibleForTesting
    void triggerHeartbeatForTests() throws IOException {
        for (BPServiceActor actor : this.bpServices) {
            actor.triggerHeartbeatForTests();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean processCommandFromActor(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        assert (this.bpServices.contains(actor));
        if (cmd == null) {
            return true;
        }
        if (4 == cmd.getAction()) {
            LOG.info((Object)("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr));
            actor.reRegister();
            return true;
        }
        this.writeLock();
        try {
            boolean bl = this.processCommandFromActive(cmd, actor);
            return bl;
        }
        finally {
            this.writeUnlock();
        }
    }

    private String blockIdArrayToString(long[] ids) {
        long maxNumberOfBlocksToLog = this.dn.getMaxNumberOfBlocksToLog();
        StringBuilder bld = new StringBuilder();
        String prefix = "";
        for (int i = 0; i < ids.length; ++i) {
            if ((long)i >= maxNumberOfBlocksToLog) {
                bld.append("...");
                break;
            }
            bld.append(prefix).append(ids[i]);
            prefix = ", ";
        }
        return bld.toString();
    }

    private boolean processCommandFromActive(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        BlockCommand bcmd = cmd instanceof BlockCommand ? (BlockCommand)cmd : null;
        BlockIdCommand blockIdCmd = cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd : null;
        switch (cmd.getAction()) {
            case 1: {
                this.dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets(), bcmd.getTargetStorageTypes());
                break;
            }
            case 2: {
                Block[] toDelete = bcmd.getBlocks();
                this.dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
                this.dn.metrics.incrBlocksRemoved(toDelete.length);
                break;
            }
            case 9: {
                LOG.info((Object)("DatanodeCommand action: DNA_CACHE for " + blockIdCmd.getBlockPoolId() + " of [" + this.blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"));
                this.dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
                break;
            }
            case 10: {
                LOG.info((Object)("DatanodeCommand action: DNA_UNCACHE for " + blockIdCmd.getBlockPoolId() + " of [" + this.blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"));
                this.dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
                break;
            }
            case 3: {
                throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
            }
            case 5: {
                String bp = ((FinalizeCommand)cmd).getBlockPoolId();
                assert (this.getBlockPoolId().equals(bp)) : "BP " + this.getBlockPoolId() + " received DNA_FINALIZE for other block pool " + bp;
                this.dn.finalizeUpgradeForPool(bp);
                break;
            }
            case 6: {
                String who = "NameNode at " + actor.getNNSocketAddress();
                this.dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
                break;
            }
            case 7: {
                LOG.info((Object)"DatanodeCommand action: DNA_ACCESSKEYUPDATE");
                if (!this.dn.isBlockTokenEnabled) break;
                this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), ((KeyUpdateCommand)cmd).getExportedKeys());
                break;
            }
            case 8: {
                LOG.info((Object)"DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
                long bandwidth = ((BalancerBandwidthCommand)cmd).getBalancerBandwidthValue();
                if (bandwidth <= 0L) break;
                DataXceiverServer dxcs = (DataXceiverServer)this.dn.dataXceiverServer.getRunnable();
                LOG.info((Object)("Updating balance throttler bandwidth from " + dxcs.balanceThrottler.getBandwidth() + " bytes/s to: " + bandwidth + " bytes/s."));
                dxcs.balanceThrottler.setBandwidth(bandwidth);
                break;
            }
            default: {
                LOG.warn((Object)("Unknown DatanodeCommand action: " + cmd.getAction()));
            }
        }
        return true;
    }

    private BPServiceActor stopAnActor(InetSocketAddress address) {
        BPServiceActor actor = this.getAnActor(address);
        if (actor != null) {
            actor.stop();
            return actor;
        }
        return null;
    }

    private BPServiceActor startAnActor(InetSocketAddress address) {
        BPServiceActor actor = new BPServiceActor(address, this);
        actor.start();
        return actor;
    }

    private BPServiceActor getAnActor(InetSocketAddress address) {
        if (address == null) {
            return null;
        }
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.getNNSocketAddress().equals(address)) continue;
            return actor;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void whirlingLikeASufi() throws Exception {
        while (this.dn.shouldRun) {
            try {
                DatanodeCommand cmd;
                List<DatanodeCommand> cmds;
                long startTime = this.scheduler.monotonicNow();
                boolean sendHeartbeat = this.scheduler.isHeartbeatDue(startTime);
                if (sendHeartbeat) {
                    this.scheduler.scheduleNextHeartbeat();
                }
                if (this.sendImmediateIBR || sendHeartbeat) {
                    this.reportReceivedDeletedBlocks();
                }
                if ((cmds = this.blockReport()) != null && this.blkReportHander != null) {
                    this.blkReportHander.processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
                }
                if ((cmd = this.cacheReport(cmds != null)) != null && this.blkReportHander != null) {
                    this.blkReportHander.processCommand(new DatanodeCommand[]{cmd});
                }
                long waitTime = this.scheduler.getHeartbeatWaitTime();
                Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
                synchronized (map) {
                    if (waitTime > 0L && !this.sendImmediateIBR) {
                        try {
                            this.pendingIncrementalBRperStorage.wait(waitTime);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn((Object)("BPOfferService for " + this + " interrupted"));
                        }
                    }
                }
                this.forwardRRIndex();
            }
            catch (Exception re) {
                LOG.warn((Object)"Exception in whirlingLikeASufi", (Throwable)re);
                try {
                    long sleepTime = 1000L;
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reportReceivedDeletedBlocks() throws IOException {
        ArrayList<StorageReceivedDeletedBlocks> reports = new ArrayList<StorageReceivedDeletedBlocks>(this.pendingIncrementalBRperStorage.size());
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry : this.pendingIncrementalBRperStorage.entrySet()) {
                DatanodeStorage storage = entry.getKey();
                PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
                if (perStorageMap.getBlockInfoCount() <= 0) continue;
                ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
                reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
            }
            this.sendImmediateIBR = false;
        }
        if (reports.size() == 0) {
            return;
        }
        boolean success = false;
        long startTime = Time.monotonicNow();
        try {
            this.blockReceivedAndDeletedWithRetry(reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
            success = true;
        }
        finally {
            this.dn.getMetrics().addIncrementalBlockReport(Time.monotonicNow() - startTime);
            if (!success) {
                Map<DatanodeStorage, PerStoragePendingIncrementalBR> map2 = this.pendingIncrementalBRperStorage;
                synchronized (map2) {
                    for (StorageReceivedDeletedBlocks report : reports) {
                        PerStoragePendingIncrementalBR perStorageMap = this.pendingIncrementalBRperStorage.get(report.getStorage());
                        perStorageMap.putMissingBlockInfos(report.getBlocks());
                        this.sendImmediateIBR = true;
                    }
                }
            }
        }
    }

    private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(DatanodeStorage storage) {
        PerStoragePendingIncrementalBR mapForStorage = this.pendingIncrementalBRperStorage.get(storage);
        if (mapForStorage == null) {
            mapForStorage = new PerStoragePendingIncrementalBR();
            this.pendingIncrementalBRperStorage.put(storage, mapForStorage);
        }
        return mapForStorage;
    }

    boolean addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo, DatanodeStorage storage) {
        boolean isNew = true;
        for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry : this.pendingIncrementalBRperStorage.entrySet()) {
            if (!entry.getValue().removeBlockInfo(bInfo)) continue;
            isNew = false;
            break;
        }
        this.getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
        return isNew;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeBlockInt(ReceivedDeletedBlockInfo bInfo, String storageUuid, boolean now) {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.addPendingReplicationBlockInfo(bInfo, this.dn.getFSDataset().getStorage(storageUuid));
            this.sendImmediateIBR = true;
            if (now) {
                this.pendingIncrementalBRperStorage.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeDeletedBlockInt(ReceivedDeletedBlockInfo bInfo, DatanodeStorage storage) {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.addPendingReplicationBlockInfo(bInfo, storage);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<DatanodeCommand> blockReport() throws IOException {
        if (!this.scheduler.isBlockReportDue()) {
            return null;
        }
        this.scheduler.setNextBlockReportOverwritten(false);
        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
        this.reportReceivedDeletedBlocks();
        long brCreateStartTime = Time.monotonicNow();
        Map<DatanodeStorage, BlockReport> perVolumeBlockLists = this.dn.getFSDataset().getBlockReports(this.getBlockPoolId());
        int i = 0;
        int totalBlockCount = 0;
        StorageBlockReport[] reports = new StorageBlockReport[perVolumeBlockLists.size()];
        DatanodeStorage[] storages = new DatanodeStorage[reports.length];
        for (Map.Entry<DatanodeStorage, BlockReport> kvPair : perVolumeBlockLists.entrySet()) {
            BlockReport blockList = kvPair.getValue();
            reports[i] = new StorageBlockReport(kvPair.getKey(), blockList);
            totalBlockCount += blockList.getNumberOfBlocks();
            storages[i] = kvPair.getKey();
            ++i;
        }
        ActiveNode an = this.nextNNForBlkReport(totalBlockCount);
        int numReportsSent = 0;
        int numRPCs = 0;
        boolean success = false;
        try {
            if (an != null) {
                this.blkReportHander = this.getAnActor(an.getRpcServerAddressForDatanodes());
                if (this.blkReportHander == null || !this.blkReportHander.isRunning()) {
                    List<DatanodeCommand> list = null;
                    return list;
                }
            } else {
                LOG.warn((Object)"Unable to send block report");
                List<DatanodeCommand> list = null;
                return list;
            }
            long brSendStartTime = Time.monotonicNow();
            long reportId = this.generateUniqueBlockReportId();
            try {
                if ((long)totalBlockCount < this.dnConf.blockReportSplitThreshold) {
                    DatanodeCommand buckets = this.blkReportHander.reportHashes(this.bpRegistration, this.getBlockPoolId(), this.slimBlockReports(reports));
                    this.removeMatchingBuckets(buckets, reports);
                    DatanodeCommand cmd = this.blkReportHander.blockReport(this.bpRegistration, this.getBlockPoolId(), reports, new BlockReportContext(1, 0, reportId));
                    numRPCs = 1;
                    numReportsSent = reports.length;
                    if (cmd != null) {
                        cmds.add(cmd);
                    }
                } else {
                    for (int r = 0; r < reports.length; ++r) {
                        StorageBlockReport[] singleReport = new StorageBlockReport[]{reports[r]};
                        DatanodeCommand buckets = this.blkReportHander.reportHashes(this.bpRegistration, this.getBlockPoolId(), this.slimBlockReports(singleReport));
                        this.removeMatchingBuckets(buckets, singleReport);
                        DatanodeCommand cmd = this.blkReportHander.blockReport(this.bpRegistration, this.getBlockPoolId(), singleReport, new BlockReportContext(reports.length, r, reportId));
                        ++numReportsSent;
                        ++numRPCs;
                        if (cmd == null) continue;
                        cmds.add(cmd);
                    }
                }
                success = true;
            }
            catch (Throwable throwable) {
                long brSendCost = Time.monotonicNow() - brSendStartTime;
                long brCreateCost = brSendStartTime - brCreateStartTime;
                this.dn.getMetrics().addBlockReport(brSendCost);
                this.dn.getMetrics().incrBlocReportCounter(numReportsSent);
                int nCmds = cmds.size();
                LOG.info((Object)((success ? "S" : "Uns") + "uccessfully sent block report 0x" + Long.toHexString(reportId) + ",  containing " + reports.length + " storage report(s), of which we sent " + numReportsSent + ". The reports had " + totalBlockCount + " total blocks and used " + numRPCs + " RPC(s). This took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing. Got back " + (nCmds == 0 ? "no commands" : (nCmds == 1 ? "one command: " + cmds.get(0) : nCmds + " commands: " + Joiner.on((String)"; ").join(cmds))) + "."));
                throw throwable;
            }
            long brSendCost = Time.monotonicNow() - brSendStartTime;
            long brCreateCost = brSendStartTime - brCreateStartTime;
            this.dn.getMetrics().addBlockReport(brSendCost);
            this.dn.getMetrics().incrBlocReportCounter(numReportsSent);
            int nCmds = cmds.size();
            LOG.info((Object)((success ? "S" : "Uns") + "uccessfully sent block report 0x" + Long.toHexString(reportId) + ",  containing " + reports.length + " storage report(s), of which we sent " + numReportsSent + ". The reports had " + totalBlockCount + " total blocks and used " + numRPCs + " RPC(s). This took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing. Got back " + (nCmds == 0 ? "no commands" : (nCmds == 1 ? "one command: " + cmds.get(0) : nCmds + " commands: " + Joiner.on((String)"; ").join(cmds))) + "."));
        }
        finally {
            if (this.blkReportHander != null) {
                for (BPServiceActor actor : this.bpServices) {
                    actor.blockReportCompleted(this.bpRegistration, storages, success);
                }
            }
        }
        this.scheduler.scheduleNextBlockReport();
        return cmds.size() == 0 ? null : cmds;
    }

    StorageBlockReport[] slimBlockReports(StorageBlockReport[] reports) {
        StorageBlockReport[] slimStorageReports = new StorageBlockReport[reports.length];
        for (int i = 0; i < reports.length; ++i) {
            StorageBlockReport fatSR = reports[i];
            BlockReport fatBR = fatSR.getReport();
            Bucket[] slimBuckets = new Bucket[fatBR.getBuckets().length];
            for (int j = 0; j < fatBR.getBuckets().length; ++j) {
                Bucket slimBucket = new Bucket();
                slimBucket.setHash(fatBR.getBuckets()[j].getHash());
                slimBucket.setBlocks(BlockListAsLongs.EMPTY);
                slimBuckets[j] = slimBucket;
            }
            BlockReport slimBR = new BlockReport(slimBuckets, fatBR.getNumberOfBlocks());
            slimStorageReports[i] = new StorageBlockReport(fatSR.getStorage(), slimBR);
        }
        return slimStorageReports;
    }

    public void removeMatchingBuckets(DatanodeCommand cmd, StorageBlockReport[] reports) {
        Map<String, List<Integer>> map = ((HashesMismatchCommand)cmd).getMissMatchingBuckets();
        for (StorageBlockReport report : reports) {
            List<Integer> mismatchingBuckets = map.get(report.getStorage().getStorageID());
            BPOfferService.removeMatchingBuckets(mismatchingBuckets, report.getReport());
        }
    }

    public static void removeMatchingBuckets(List<Integer> mismatchingBuckets, BlockReport report) {
        HashSet<Integer> mismatchingBucketsSet = new HashSet<Integer>(mismatchingBuckets);
        for (int i = 0; i < report.getBuckets().length; ++i) {
            if (mismatchingBucketsSet.contains(i)) continue;
            report.getBuckets()[i].setBlocks(BlockListAsLongs.EMPTY);
            report.getBuckets()[i].setSkip(true);
        }
    }

    DatanodeCommand cacheReport(boolean hasHandler) throws IOException {
        if (this.dn.getFSDataset().getCacheCapacity() == 0L) {
            return null;
        }
        DatanodeCommand cmd = null;
        long startTime = Time.monotonicNow();
        if (startTime - this.lastCacheReport > this.dnConf.cacheReportInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Sending cacheReport from service actor: " + this));
            }
            this.lastCacheReport = startTime;
            String bpid = this.getBlockPoolId();
            List<Long> blockIds = this.dn.getFSDataset().getCacheReport(bpid);
            long createTime = Time.monotonicNow();
            if (!hasHandler) {
                if (!this.nnList.isEmpty()) {
                    ActiveNode an = this.nnList.get(this.rand.nextInt(this.nnList.size()));
                    this.blkReportHander = this.getAnActor(an.getRpcServerAddressForDatanodes());
                    if (this.blkReportHander == null || !this.blkReportHander.isRunning()) {
                        return null;
                    }
                } else {
                    LOG.warn((Object)"Unable to send cache report");
                    return null;
                }
            }
            cmd = this.blkReportHander.cacheReport(this.bpRegistration, bpid, blockIds);
            long sendTime = Time.monotonicNow();
            long createCost = createTime - startTime;
            long sendCost = sendTime - createTime;
            this.dn.getMetrics().addCacheReport(sendCost);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("CacheReport of " + blockIds.size() + " block(s) took " + createCost + " msec to generate and " + sendCost + " msecs for RPC and NN processing"));
            }
        }
        return cmd;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void triggerBlockReportForTests() {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            long nextBlockReportTime = this.scheduler.scheduleBlockReport(0L);
            this.pendingIncrementalBRperStorage.notifyAll();
            while (nextBlockReportTime - this.scheduler.nextBlockReportTime >= 0L) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void triggerDeletionReportForTestsInt() {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.sendImmediateIBR = true;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.sendImmediateIBR) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    boolean hasPendingIBR() {
        return this.sendImmediateIBR;
    }

    private long generateUniqueBlockReportId() {
        ++this.prevBlockReportId;
        while (this.prevBlockReportId == 0L) {
            this.prevBlockReportId = DFSUtil.getRandom().nextLong();
        }
        return this.prevBlockReportId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void updateNNList(SortedActiveNodeList list) throws IOException {
        this.writeLock();
        try {
            ArrayList<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>();
            for (ActiveNode ann : list.getActiveNodes()) {
                nnAddresses.add(ann.getRpcServerAddressForDatanodes());
            }
            this.refreshNNList(nnAddresses);
            if (list.getLeader() != null) {
                this.bpServiceToActive = this.getAnActor(list.getLeader().getRpcServerAddressForDatanodes());
            }
            this.nnList.clear();
            this.nnList.addAll(list.getActiveNodes());
            this.blackListNN.clear();
        }
        finally {
            this.writeUnlock();
        }
    }

    boolean canUpdateNNList() {
        this.writeLock();
        try {
            if (this.nnList == null || this.nnList.size() == 0) {
                boolean bl = true;
                return bl;
            }
            if (System.currentTimeMillis() - this.nnListLastUpdate > 5000L) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.writeUnlock();
        }
    }

    void setLastNNListUpdateTime() {
        this.nnListLastUpdate = System.currentTimeMillis();
    }

    public synchronized void startWhirlingSufiThread() {
        if (this.blockReportThread == null || !this.blockReportThread.isAlive()) {
            this.blockReportThread = new Thread((Runnable)this, "BlkReportThread");
            this.blockReportThread.setDaemon(true);
            this.blockReportThread.start();
        }
    }

    @Override
    public void run() {
        try {
            this.whirlingLikeASufi();
        }
        catch (Exception ex) {
            LOG.warn((Object)("Unexpected exception in BPOfferService " + this), (Throwable)ex);
        }
    }

    private void reportBadBlocksWithRetry(final ExtendedBlock block, final String storageUuid, final StorageType storageType) throws IOException {
        this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                actor.reportBadBlocks(block, storageUuid, storageType);
                return null;
            }
        });
    }

    private void blockReceivedAndDeletedWithRetry(final StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
        String blocks = "";
        for (StorageReceivedDeletedBlocks srdb : receivedAndDeletedBlocks) {
            blocks = blocks + "[";
            for (ReceivedDeletedBlockInfo b : srdb.getBlocks()) {
                blocks = blocks + " " + b.getBlock().getBlockId() + b.toString();
            }
            blocks = blocks + "]";
        }
        LOG.info((Object)("sending blockReceivedAndDeletedWithRetry for blocks " + blocks));
        this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                actor.blockReceivedAndDeleted(BPOfferService.this.bpRegistration, BPOfferService.this.getBlockPoolId(), receivedAndDeletedBlocks);
                return null;
            }
        });
    }

    private void reportRemoteBadBlockWithRetry(final DatanodeInfo dnInfo, final ExtendedBlock block) throws IOException {
        this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                actor.reportRemoteBadBlock(dnInfo, block);
                return null;
            }
        });
    }

    public byte[] getSmallFileDataFromNN(final int id) throws IOException {
        byte[] data = (byte[])this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                return actor.getSmallFileDataFromNN(id);
            }
        });
        return data;
    }

    public boolean firstActor(BPServiceActor actor) {
        return this.bpServices.size() > 0 && this.bpServices.get(0).equals(actor);
    }

    private Object doActorActionWithRetry(ActorActionHandler handler) throws IOException {
        Exception exception = null;
        boolean success = false;
        BPServiceActor actor = null;
        int MAX_RPC_RETRIES = this.nnList.size();
        for (int i = 0; i <= MAX_RPC_RETRIES; ++i) {
            try {
                actor = this.nextNNForNonBlkReportRPC();
                if (actor == null) continue;
                Object obj = handler.doAction(actor);
                success = true;
                return obj;
            }
            catch (Exception e) {
                exception = e;
                if (!ExceptionCheck.isLocalConnectException(e) && !ExceptionCheck.isRetriableException(e)) break;
                LOG.debug((Object)("RPC faild. NN used was " + actor.getNNSocketAddress() + ", retries left (" + (MAX_RPC_RETRIES - i) + ")  Exception " + e));
                if (!ExceptionCheck.isLocalConnectException(e)) continue;
                this.blackListNN.add(actor.getNNSocketAddress());
            }
        }
        if (!success && exception != null) {
            if (exception instanceof RemoteException) {
                throw (RemoteException)((Object)exception);
            }
            throw (IOException)exception;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BPServiceActor nextNNForNonBlkReportRPC() {
        this.readLock();
        try {
            if (this.nnList == null || this.nnList.isEmpty()) {
                BPServiceActor bPServiceActor = null;
                return bPServiceActor;
            }
            for (int i = 0; i < 10; ++i) {
                try {
                    BPServiceActor actor;
                    this.rpcRoundRobinIndex.incrementAndGet();
                    ActiveNode ann = this.nnList.get(Math.abs(this.rpcRoundRobinIndex.get()) % this.nnList.size());
                    if (this.blackListNN.contains(ann.getRpcServerAddressForDatanodes()) || (actor = this.getAnActor(ann.getRpcServerAddressForDatanodes())) == null) continue;
                    BPServiceActor bPServiceActor = actor;
                    return bPServiceActor;
                }
                catch (Exception e) {
                    // empty catch block
                }
            }
            BPServiceActor bPServiceActor = null;
            return bPServiceActor;
        }
        finally {
            this.readUnlock();
        }
    }

    private ActiveNode nextNNForBlkReport(long noOfBlks) throws IOException {
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        ActiveNode annToBR = null;
        BPServiceActor leaderActor = this.getLeaderActor();
        if (leaderActor != null) {
            try {
                annToBR = leaderActor.nextNNForBlkReport(noOfBlks, this.bpRegistration);
            }
            catch (RemoteException e) {
                if (e.getClassName().equals(BRLoadBalancingNonLeaderException.class.getName()) || e.getClassName().equals(BRLoadBalancingOverloadException.class.getName())) {
                    LOG.warn((Object)e);
                    if (e.getClassName().equals(BRLoadBalancingNonLeaderException.class.getName())) {
                        this.nnListLastUpdate = 0L;
                    }
                }
                throw e;
            }
        }
        return annToBR;
    }

    private BPServiceActor getLeaderActor() {
        if (this.nnList.size() > 0) {
            ActiveNode leaderNode = null;
            for (ActiveNode an : this.nnList) {
                if (leaderNode == null) {
                    leaderNode = an;
                }
                if (leaderNode.getId() <= an.getId()) continue;
                leaderNode = an;
            }
            BPServiceActor leaderActor = this.getAnActor(leaderNode.getRpcServerAddressForDatanodes());
            return leaderActor;
        }
        return null;
    }

    private synchronized void forwardRRIndex() {
        this.readLock();
        try {
            if (this.nnList != null && !this.nnList.isEmpty()) {
                for (int i = 0; i < 10; ++i) {
                    this.refreshNNRoundRobinIndex.incrementAndGet();
                    ActiveNode ann = this.nnList.get(Math.abs(this.refreshNNRoundRobinIndex.get()) % this.nnList.size());
                    if (this.blackListNN.contains(ann.getRpcServerAddressForDatanodes())) continue;
                    return;
                }
            } else {
                this.refreshNNRoundRobinIndex.set(-1);
            }
        }
        finally {
            this.readUnlock();
        }
    }

    boolean shouldRetryInit() {
        if (this.hasBlockPoolId()) {
            return true;
        }
        return this.isAlive();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void triggerBlockReport(BlockReportOptions options) throws IOException {
        if (options.isIncremental()) {
            LOG.info((Object)(this.toString() + ": scheduling an incremental block report."));
            Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
            synchronized (map) {
                this.sendImmediateIBR = true;
                this.pendingIncrementalBRperStorage.notifyAll();
            }
        }
        LOG.info((Object)(this.toString() + ": scheduling a full block report."));
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.scheduler.scheduleBlockReport(0L);
            this.pendingIncrementalBRperStorage.notifyAll();
        }
    }

    static class Scheduler {
        @VisibleForTesting
        volatile long nextBlockReportTime = this.monotonicNow();
        @VisibleForTesting
        volatile long nextHeartbeatTime = this.monotonicNow();
        @VisibleForTesting
        boolean resetBlockReportTime = true;
        private final long heartbeatIntervalMs;
        private final long blockReportIntervalMs;
        private boolean nextBlockReportOverwritten = false;

        Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
            this.heartbeatIntervalMs = heartbeatIntervalMs;
            this.blockReportIntervalMs = blockReportIntervalMs;
        }

        long scheduleHeartbeat() {
            this.nextHeartbeatTime = this.monotonicNow();
            return this.nextHeartbeatTime;
        }

        long scheduleNextHeartbeat() {
            this.nextHeartbeatTime += this.heartbeatIntervalMs;
            return this.nextHeartbeatTime;
        }

        boolean isHeartbeatDue(long startTime) {
            return this.nextHeartbeatTime - startTime <= 0L;
        }

        boolean isBlockReportDue() {
            return this.nextBlockReportTime - this.monotonicNow() <= 0L;
        }

        long scheduleBlockReport(long delay) {
            this.nextBlockReportTime = delay > 0L ? this.monotonicNow() + (long)DFSUtil.getRandom().nextInt((int)delay) : this.monotonicNow();
            this.resetBlockReportTime = true;
            this.nextBlockReportOverwritten = true;
            return this.nextBlockReportTime;
        }

        void scheduleNextBlockReport() {
            if (this.nextBlockReportOverwritten) {
                this.nextBlockReportOverwritten = false;
                return;
            }
            if (this.resetBlockReportTime) {
                this.nextBlockReportTime = this.monotonicNow() + (long)DFSUtil.getRandom().nextInt((int)this.blockReportIntervalMs);
                this.resetBlockReportTime = false;
            } else {
                this.nextBlockReportTime += (this.monotonicNow() - this.nextBlockReportTime + this.blockReportIntervalMs) / this.blockReportIntervalMs * this.blockReportIntervalMs;
            }
        }

        void setNextBlockReportOverwritten(boolean value) {
            this.nextBlockReportOverwritten = value;
        }

        long getHeartbeatWaitTime() {
            return this.nextHeartbeatTime - this.monotonicNow();
        }

        @VisibleForTesting
        public long monotonicNow() {
            return Time.monotonicNow();
        }
    }

    private static class PerStoragePendingIncrementalBR {
        private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR = Maps.newHashMap();

        private PerStoragePendingIncrementalBR() {
        }

        int getBlockInfoCount() {
            return this.pendingIncrementalBR.size();
        }

        ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
            ReceivedDeletedBlockInfo[] blockInfos = this.pendingIncrementalBR.values().toArray(new ReceivedDeletedBlockInfo[this.getBlockInfoCount()]);
            this.pendingIncrementalBR.clear();
            return blockInfos;
        }

        int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
            int blocksPut = 0;
            for (ReceivedDeletedBlockInfo rdbi : blockArray) {
                if (this.pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) continue;
                this.pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
                ++blocksPut;
            }
            return blocksPut;
        }

        void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
            this.pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
        }

        boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
            return this.pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null;
        }
    }

    private static interface ActorActionHandler {
        public Object doAction(BPServiceActor var1) throws IOException;
    }
}

