/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.hops.leader_election.node.ActiveNode;
import io.hops.leader_election.node.ActiveNodePBImpl;
import io.hops.leader_election.node.SortedActiveNodeList;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExceptionCheck;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor;
import org.apache.hadoop.hdfs.server.datanode.BRLoadBalancingException;
import org.apache.hadoop.hdfs.server.datanode.CRLoadBalancingException;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
class BPOfferService
implements Runnable {
    static final Log LOG = DataNode.LOG;
    NamespaceInfo bpNSInfo;
    volatile DatanodeRegistration bpRegistration;
    private final DataNode dn;
    private BPServiceActor bpServiceToActive = null;
    private List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();
    private long lastActiveClaimTxId = -1L;
    private final DNConf dnConf;
    private volatile boolean sendImmediateIBR = false;
    volatile long lastDeletedReport = 0L;
    private volatile long lastBlockReport = 0L;
    private boolean resetBlockReportTime = true;
    volatile long lastCacheReport = 0L;
    private BPServiceActor blkReportHander = null;
    private List<ActiveNode> nnList = Collections.synchronizedList(new ArrayList());
    private List<InetSocketAddress> blackListNN = Collections.synchronizedList(new ArrayList());
    private volatile int rpcRoundRobinIndex = 0;
    private volatile int refreshNNRoundRobinIndex = 0;
    final int maxNumIncrementalReportThreads;
    private final ExecutorService incrementalBRExecutor;
    private final ExecutorService brDispatcher;
    private final Map<DatanodeStorage, PerStoragePendingIncrementalBR> pendingIncrementalBRperStorage = Maps.newHashMap();
    private Thread blockReportThread = null;
    private BRTask brTask = new BRTask();
    private Future futur = null;
    private final Object incrementalBRLock = new Object();
    private int incrementalBRCounter = 0;
    private final IncrementalBRTask incrementalBRTask = new IncrementalBRTask();
    long lastupdate = System.currentTimeMillis();

    BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
        Preconditions.checkArgument((!nnAddrs.isEmpty() ? 1 : 0) != 0, (Object)"Must pass at least one NN.");
        this.dn = dn;
        for (InetSocketAddress addr : nnAddrs) {
            this.bpServices.add(new BPServiceActor(addr, this));
            this.nnList.add((ActiveNode)new ActiveNodePBImpl(0L, "", addr.getAddress().getHostAddress(), addr.getPort(), "", addr.getAddress().getHostAddress(), addr.getPort()));
        }
        this.dnConf = dn.getDnConf();
        this.maxNumIncrementalReportThreads = this.dnConf.iBRDispatherTPSize;
        this.incrementalBRExecutor = Executors.newFixedThreadPool(this.maxNumIncrementalReportThreads);
        this.brDispatcher = Executors.newSingleThreadExecutor();
    }

    void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
        HashSet oldAddrs = Sets.newHashSet();
        for (BPServiceActor actor : this.bpServices) {
            oldAddrs.add(actor.getNNSocketAddress());
        }
        HashSet newAddrs = Sets.newHashSet(addrs);
        Sets.SetView deadNNs = Sets.difference((Set)oldAddrs, (Set)newAddrs);
        Sets.SetView newNNs = Sets.difference((Set)newAddrs, (Set)oldAddrs);
        if (deadNNs.size() != 0) {
            for (InetSocketAddress deadNN : deadNNs) {
                BPServiceActor deadActor = this.stopAnActor(deadNN);
                this.bpServices.remove(deadActor);
                LOG.debug((Object)("Stopped actor for " + deadActor.getNNSocketAddress()));
            }
        }
        if (newNNs.size() != 0) {
            for (InetSocketAddress newNN : newNNs) {
                BPServiceActor newActor = this.startAnActor(newNN);
                this.bpServices.add(newActor);
                LOG.debug((Object)("Started actor for " + newActor.getNNSocketAddress()));
            }
        }
    }

    boolean isInitialized() {
        return this.bpRegistration != null;
    }

    boolean isAlive() {
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.isAlive()) continue;
            return true;
        }
        return false;
    }

    synchronized String getBlockPoolId() {
        if (this.bpNSInfo != null) {
            return this.bpNSInfo.getBlockPoolID();
        }
        LOG.warn((Object)"Block pool ID needed, but service not yet registered with NN", (Throwable)new Exception("trace"));
        return null;
    }

    synchronized NamespaceInfo getNamespaceInfo() {
        return this.bpNSInfo;
    }

    public synchronized String toString() {
        if (this.bpNSInfo == null) {
            String datanodeUuid = this.dn.getDatanodeUuid();
            if (datanodeUuid == null || datanodeUuid.isEmpty()) {
                datanodeUuid = "unassigned";
            }
            return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
        }
        return "Block pool " + this.getBlockPoolId() + " (Datanode Uuid " + this.dn.getDatanodeUuid() + ")";
    }

    void reportBadBlocks(ExtendedBlock block, String storageUuid, StorageType storageType) {
        this.checkBlock(block);
        try {
            this.reportBadBlocksWithRetry(block, storageUuid, storageType);
        }
        catch (Exception e) {
            LOG.error((Object)"Failed to send bad block report to any namenode ");
            e.printStackTrace();
        }
    }

    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint, String storageUuid) {
        this.checkBlock(block);
        this.checkDelHint(delHint);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED, delHint);
        this.notifyNamenodeBlockImmediatelyInt(bInfo, storageUuid, true);
    }

    private void checkBlock(ExtendedBlock block) {
        Preconditions.checkArgument((block != null ? 1 : 0) != 0, (Object)"block is null");
        Preconditions.checkArgument((boolean)block.getBlockPoolId().equals(this.getBlockPoolId()), (String)"block belongs to BP %s instead of BP %s", (Object[])new Object[]{block.getBlockPoolId(), this.getBlockPoolId()});
    }

    private void checkDelHint(String delHint) {
        Preconditions.checkArgument((delHint != null ? 1 : 0) != 0, (Object)"delHint is null");
    }

    void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.DELETED, null);
        this.notifyNamenodeDeletedBlockInt(bInfo, this.dn.getFSDataset().getStorage(storageUuid));
    }

    public void notifyNamenodeCreatingBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.CREATING, null);
        this.notifyNamenodeBlockImmediatelyInt(bInfo, storageUuid, false);
    }

    public void notifyNamenodeAppendingBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.APPENDING, null);
        this.notifyNamenodeBlockImmediatelyInt(bInfo, storageUuid, false);
    }

    public void notifyNamenodeAppendingRecoveredAppend(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECOVERING_APPEND, null);
        this.notifyNamenodeBlockImmediatelyInt(bInfo, storageUuid, true);
    }

    public void notifyNamenodeUpdateRecoveredBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.UPDATE_RECOVERED, null);
        this.notifyNamenodeBlockImmediatelyInt(bInfo, storageUuid, true);
    }

    void start() {
        for (BPServiceActor actor : this.bpServices) {
            actor.start();
        }
    }

    void stop() {
        for (BPServiceActor actor : this.bpServices) {
            actor.stop();
        }
    }

    void join() {
        for (BPServiceActor actor : this.bpServices) {
            actor.join();
        }
    }

    DataNode getDataNode() {
        return this.dn;
    }

    synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
        if (this.bpNSInfo == null) {
            this.bpNSInfo = nsInfo;
            boolean success = false;
            try {
                this.dn.initBlockPool(this);
                success = true;
            }
            finally {
                if (!success) {
                    this.bpNSInfo = null;
                }
            }
        } else {
            BPOfferService.checkNSEquality(this.bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID");
            BPOfferService.checkNSEquality(this.bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID");
            BPOfferService.checkNSEquality(this.bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID");
        }
    }

    synchronized void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException {
        if (this.bpRegistration != null) {
            BPOfferService.checkNSEquality(this.bpRegistration.getStorageInfo().getNamespaceID(), reg.getStorageInfo().getNamespaceID(), "namespace ID");
            BPOfferService.checkNSEquality(this.bpRegistration.getStorageInfo().getClusterID(), reg.getStorageInfo().getClusterID(), "cluster ID");
        } else {
            this.bpRegistration = reg;
        }
        this.dn.bpRegistrationSucceeded(this.bpRegistration, this.getBlockPoolId());
        if (this.dn.isBlockTokenEnabled) {
            this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), reg.getExportedKeys());
        }
    }

    private static void checkNSEquality(Object ourID, Object theirID, String idHelpText) throws IOException {
        if (!ourID.equals(theirID)) {
            throw new IOException(idHelpText + " mismatch: previously connected to " + idHelpText + " " + ourID + " but now connected to " + idHelpText + " " + theirID);
        }
    }

    synchronized DatanodeRegistration createRegistration() {
        Preconditions.checkState((this.bpNSInfo != null ? 1 : 0) != 0, (Object)"getRegistration() can only be called after initial handshake");
        return this.dn.createBPRegistration(this.bpNSInfo);
    }

    synchronized void shutdownActor(BPServiceActor actor) {
        if (this.bpServiceToActive == actor) {
            this.bpServiceToActive = null;
        }
        this.bpServices.remove(actor);
        for (ActiveNode ann : this.nnList) {
            if (!ann.getRpcServerAddressForDatanodes().equals(actor.getNNSocketAddress())) continue;
            this.nnList.remove(ann);
            break;
        }
        if (this.bpServices.isEmpty()) {
            this.dn.shutdownBlockPool(this);
        }
    }

    void trySendErrorReport(int errCode, String errMsg) {
        for (BPServiceActor actor : this.bpServices) {
            actor.trySendErrorReport(errCode, errMsg);
        }
    }

    void scheduleBlockReport(long delay) {
        this.scheduleBlockReportInt(delay);
    }

    void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
        try {
            this.reportRemoteBadBlockWithRetry(dnInfo, block);
        }
        catch (IOException e) {
            LOG.warn((Object)("Couldn't report bad block " + block + "" + e));
        }
    }

    synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
        if (this.bpServiceToActive != null) {
            return this.bpServiceToActive.bpNamenode;
        }
        return null;
    }

    @VisibleForTesting
    List<BPServiceActor> getBPServiceActors() {
        return Lists.newArrayList(this.bpServices);
    }

    void signalRollingUpgrade(boolean inProgress) {
        if (inProgress) {
            this.dn.getFSDataset().enableTrash(this.getBlockPoolId());
        } else {
            this.dn.getFSDataset().restoreTrash(this.getBlockPoolId());
        }
    }

    boolean containsNN(InetSocketAddress addr) {
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.getNNSocketAddress().equals(addr)) continue;
            return true;
        }
        return false;
    }

    @VisibleForTesting
    int countNameNodes() {
        return this.bpServices.size();
    }

    @VisibleForTesting
    void triggerBlockReportForTests() throws IOException {
        this.triggerBlockReportForTestsInt();
    }

    @VisibleForTesting
    void triggerDeletionReportForTests() throws IOException {
        this.triggerDeletionReportForTestsInt();
    }

    @VisibleForTesting
    void triggerHeartbeatForTests() throws IOException {
        for (BPServiceActor actor : this.bpServices) {
            actor.triggerHeartbeatForTests();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean processCommandFromActor(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        assert (this.bpServices.contains(actor));
        if (cmd == null) {
            return true;
        }
        if (4 == cmd.getAction()) {
            LOG.info((Object)("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr));
            actor.reRegister();
            return true;
        }
        BPOfferService bPOfferService = this;
        synchronized (bPOfferService) {
            return this.processCommandFromActive(cmd, actor);
        }
    }

    private String blockIdArrayToString(long[] ids) {
        long maxNumberOfBlocksToLog = this.dn.getMaxNumberOfBlocksToLog();
        StringBuilder bld = new StringBuilder();
        String prefix = "";
        for (int i = 0; i < ids.length; ++i) {
            if ((long)i >= maxNumberOfBlocksToLog) {
                bld.append("...");
                break;
            }
            bld.append(prefix).append(ids[i]);
            prefix = ", ";
        }
        return bld.toString();
    }

    private boolean processCommandFromActive(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        BlockCommand bcmd = cmd instanceof BlockCommand ? (BlockCommand)cmd : null;
        BlockIdCommand blockIdCmd = cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd : null;
        switch (cmd.getAction()) {
            case 1: {
                this.dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets(), bcmd.getTargetStorageTypes());
                this.dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
                break;
            }
            case 2: {
                Block[] toDelete = bcmd.getBlocks();
                if (this.dn.blockScanner != null) {
                    this.dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
                }
                this.dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
                this.dn.metrics.incrBlocksRemoved(toDelete.length);
                break;
            }
            case 9: {
                LOG.info((Object)("DatanodeCommand action: DNA_CACHE for " + blockIdCmd.getBlockPoolId() + " of [" + this.blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"));
                this.dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
                break;
            }
            case 10: {
                LOG.info((Object)("DatanodeCommand action: DNA_UNCACHE for " + blockIdCmd.getBlockPoolId() + " of [" + this.blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"));
                this.dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
                break;
            }
            case 3: {
                throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
            }
            case 5: {
                String bp = ((FinalizeCommand)cmd).getBlockPoolId();
                assert (this.getBlockPoolId().equals(bp)) : "BP " + this.getBlockPoolId() + " received DNA_FINALIZE for other block pool " + bp;
                this.dn.finalizeUpgradeForPool(bp);
                break;
            }
            case 6: {
                String who = "NameNode at " + actor.getNNSocketAddress();
                this.dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
                break;
            }
            case 7: {
                LOG.info((Object)"DatanodeCommand action: DNA_ACCESSKEYUPDATE");
                if (!this.dn.isBlockTokenEnabled) break;
                this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), ((KeyUpdateCommand)cmd).getExportedKeys());
                break;
            }
            case 8: {
                LOG.info((Object)"DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
                long bandwidth = ((BalancerBandwidthCommand)cmd).getBalancerBandwidthValue();
                if (bandwidth <= 0L) break;
                DataXceiverServer dxcs = (DataXceiverServer)this.dn.dataXceiverServer.getRunnable();
                LOG.info((Object)("Updating balance throttler bandwidth from " + dxcs.balanceThrottler.getBandwidth() + " bytes/s to: " + bandwidth + " bytes/s."));
                dxcs.balanceThrottler.setBandwidth(bandwidth);
                break;
            }
            default: {
                LOG.warn((Object)("Unknown DatanodeCommand action: " + cmd.getAction()));
            }
        }
        return true;
    }

    private BPServiceActor stopAnActor(InetSocketAddress address) {
        BPServiceActor actor = this.getAnActor(address);
        if (actor != null) {
            actor.stop();
            return actor;
        }
        return null;
    }

    private BPServiceActor startAnActor(InetSocketAddress address) {
        BPServiceActor actor = new BPServiceActor(address, this);
        actor.start();
        return actor;
    }

    private BPServiceActor getAnActor(InetSocketAddress address) {
        if (address == null) {
            return null;
        }
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.getNNSocketAddress().equals(address)) continue;
            return actor;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void whirlingLikeASufi() throws Exception {
        while (this.dn.shouldRun) {
            try {
                long startTime = Time.now();
                if (this.sendImmediateIBR || startTime - this.lastDeletedReport > this.dnConf.deleteReportInterval) {
                    this.reportReceivedDeletedBlocks();
                    this.lastDeletedReport = startTime;
                }
                this.startBRThread();
                long waitTime = 1000L;
                Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
                synchronized (map) {
                    if (waitTime > 0L && !this.sendImmediateIBR) {
                        try {
                            this.pendingIncrementalBRperStorage.wait(waitTime);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn((Object)("BPOfferService for " + this + " interrupted"));
                        }
                    }
                }
                this.forwardRRIndex();
            }
            catch (Exception re) {
                LOG.warn((Object)"Exception in whirlingLikeASufi", (Throwable)re);
                try {
                    long sleepTime = 1000L;
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void startBRThread() throws InterruptedException, ExecutionException {
        if (this.futur == null || this.futur.isDone()) {
            if (this.futur != null) {
                try {
                    this.futur.get();
                }
                finally {
                    this.futur = null;
                }
            }
            this.futur = this.brDispatcher.submit(this.brTask);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportReceivedDeletedBlocks() throws IOException {
        Object object = this.incrementalBRLock;
        synchronized (object) {
            if (this.incrementalBRCounter < this.maxNumIncrementalReportThreads) {
                ++this.incrementalBRCounter;
                this.incrementalBRExecutor.submit(this.incrementalBRTask);
            }
        }
    }

    private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(DatanodeStorage storage) {
        PerStoragePendingIncrementalBR mapForStorage = this.pendingIncrementalBRperStorage.get(storage);
        if (mapForStorage == null) {
            mapForStorage = new PerStoragePendingIncrementalBR();
            this.pendingIncrementalBRperStorage.put(storage, mapForStorage);
        }
        return mapForStorage;
    }

    boolean addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo, DatanodeStorage storage) {
        boolean isNew = true;
        for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry : this.pendingIncrementalBRperStorage.entrySet()) {
            if (!entry.getValue().removeBlockInfo(bInfo)) continue;
            isNew = false;
            break;
        }
        this.getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
        return isNew;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeBlockImmediatelyInt(ReceivedDeletedBlockInfo bInfo, String storageUuid, boolean now) {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.addPendingReplicationBlockInfo(bInfo, this.dn.getFSDataset().getStorage(storageUuid));
            this.sendImmediateIBR = true;
            if (now) {
                this.pendingIncrementalBRperStorage.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeDeletedBlockInt(ReceivedDeletedBlockInfo bInfo, DatanodeStorage storage) {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.addPendingReplicationBlockInfo(bInfo, storage);
        }
    }

    List<DatanodeCommand> blockReport() throws IOException {
        int numReportsSent;
        long startTime = Time.now();
        if (startTime - this.lastBlockReport <= this.dnConf.blockReportInterval) {
            return null;
        }
        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
        this.reportReceivedDeletedBlocks();
        this.lastDeletedReport = startTime;
        long brCreateStartTime = Time.now();
        Map<DatanodeStorage, BlockReport> perVolumeBlockLists = this.dn.getFSDataset().getBlockReports(this.getBlockPoolId());
        int i = 0;
        int totalBlockCount = 0;
        StorageBlockReport[] reports = new StorageBlockReport[perVolumeBlockLists.size()];
        for (Map.Entry<DatanodeStorage, BlockReport> kvPair : perVolumeBlockLists.entrySet()) {
            BlockReport blockList = kvPair.getValue();
            reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
            totalBlockCount += blockList.getNumberOfBlocks();
        }
        ActiveNode an = this.nextNNForBlkReport(totalBlockCount);
        if (an != null) {
            this.blkReportHander = this.getAnActor(an.getRpcServerAddressForDatanodes());
            if (this.blkReportHander == null || !this.blkReportHander.isInitialized()) {
                return null;
            }
        } else {
            LOG.warn((Object)"Unable to send block report");
            return null;
        }
        long brSendStartTime = Time.now();
        if ((long)totalBlockCount < this.dnConf.blockReportSplitThreshold) {
            numReportsSent = 1;
            DatanodeCommand datanodeCommand = this.blkReportHander.blockReport(this.bpRegistration, this.getBlockPoolId(), reports);
            if (datanodeCommand != null) {
                cmds.add(datanodeCommand);
            }
        } else {
            numReportsSent = i;
            for (StorageBlockReport report : reports) {
                StorageBlockReport[] singleReport = new StorageBlockReport[]{report};
                DatanodeCommand cmd = this.blkReportHander.blockReport(this.bpRegistration, this.getBlockPoolId(), singleReport);
                if (cmd == null) continue;
                cmds.add(cmd);
            }
        }
        long l = Time.now() - brSendStartTime;
        long brCreateCost = brSendStartTime - brCreateStartTime;
        this.dn.getMetrics().addBlockReport(l);
        LOG.info((Object)("Sent " + numReportsSent + " blockreports " + totalBlockCount + " blocks total. Took " + brCreateCost + " msec to generate and " + l + " msecs for RPC and NN processing.  Got back commands " + (cmds.size() == 0 ? "none" : Joiner.on((String)"; ").join(cmds))));
        this.scheduleNextBlockReport(startTime);
        return cmds.size() == 0 ? null : cmds;
    }

    private void scheduleNextBlockReport(long previousReportStartTime) {
        if (this.resetBlockReportTime) {
            this.lastBlockReport = previousReportStartTime - (long)DFSUtil.getRandom().nextInt((int)this.dnConf.blockReportInterval);
            this.resetBlockReportTime = false;
        } else {
            this.lastBlockReport += (Time.now() - this.lastBlockReport) / this.dnConf.blockReportInterval * this.dnConf.blockReportInterval;
        }
    }

    DatanodeCommand cacheReport(boolean hasHandler) throws IOException {
        if (this.dn.getFSDataset().getCacheCapacity() == 0L) {
            return null;
        }
        DatanodeCommand cmd = null;
        long startTime = Time.monotonicNow();
        if (startTime - this.lastCacheReport > this.dnConf.cacheReportInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Sending cacheReport from service actor: " + this));
            }
            this.lastCacheReport = startTime;
            String bpid = this.getBlockPoolId();
            List<Long> blockIds = this.dn.getFSDataset().getCacheReport(bpid);
            long createTime = Time.monotonicNow();
            if (!hasHandler) {
                ActiveNode an = this.nextNNForCacheReport(blockIds.size());
                if (an != null) {
                    this.blkReportHander = this.getAnActor(an.getRpcServerAddressForDatanodes());
                    if (this.blkReportHander == null || !this.blkReportHander.isInitialized()) {
                        return null;
                    }
                } else {
                    LOG.warn((Object)"Unable to send cache report");
                    return null;
                }
            }
            cmd = this.blkReportHander.cacheReport(this.bpRegistration, bpid, blockIds);
            long sendTime = Time.monotonicNow();
            long createCost = createTime - startTime;
            long sendCost = sendTime - createTime;
            this.dn.getMetrics().addCacheReport(sendCost);
            LOG.debug((Object)("CacheReport of " + blockIds.size() + " block(s) took " + createCost + " msec to generate and " + sendCost + " msecs for RPC and NN processing"));
        }
        return cmd;
    }

    void scheduleBlockReportInt(long delay) {
        this.lastBlockReport = delay > 0L ? Time.now() - (this.dnConf.blockReportInterval - (long)DFSUtil.getRandom().nextInt((int)delay)) : 0L;
        this.resetBlockReportTime = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void triggerBlockReportForTestsInt() {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.lastBlockReport = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastBlockReport == 0L) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void triggerDeletionReportForTestsInt() {
        Map<DatanodeStorage, PerStoragePendingIncrementalBR> map = this.pendingIncrementalBRperStorage;
        synchronized (map) {
            this.lastDeletedReport = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastDeletedReport == 0L) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    boolean hasPendingIBR() {
        return this.sendImmediateIBR;
    }

    synchronized void updateNNList(SortedActiveNodeList list) throws IOException {
        ArrayList<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>();
        for (ActiveNode ann : list.getActiveNodes()) {
            nnAddresses.add(ann.getRpcServerAddressForDatanodes());
        }
        this.refreshNNList(nnAddresses);
        if (list.getLeader() != null) {
            this.bpServiceToActive = this.getAnActor(list.getLeader().getRpcServerAddressForDatanodes());
        }
        this.nnList.clear();
        this.nnList.addAll(list.getActiveNodes());
        this.blackListNN.clear();
    }

    synchronized boolean canUpdateNNList(InetSocketAddress address) {
        if (this.nnList == null || this.nnList.size() == 0) {
            return true;
        }
        if (System.currentTimeMillis() - this.lastupdate > 2000L) {
            this.lastupdate = System.currentTimeMillis();
            return true;
        }
        return false;
    }

    public void startWhirlingSufiThread() {
        if (this.blockReportThread == null || !this.blockReportThread.isAlive()) {
            this.blockReportThread = new Thread((Runnable)this, "BlkReportThread");
            this.blockReportThread.setDaemon(true);
            this.blockReportThread.start();
        }
    }

    @Override
    public void run() {
        try {
            this.whirlingLikeASufi();
        }
        catch (Exception ex) {
            LOG.warn((Object)("Unexpected exception in BPOfferService " + this), (Throwable)ex);
        }
    }

    private void reportBadBlocksWithRetry(final ExtendedBlock block, final String storageUuid, final StorageType storageType) throws IOException {
        this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                actor.reportBadBlocks(block, storageUuid, storageType);
                return null;
            }
        });
    }

    private void blockReceivedAndDeletedWithRetry(final StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
        String blocks = "";
        for (StorageReceivedDeletedBlocks srdb : receivedAndDeletedBlocks) {
            blocks = blocks + "[";
            for (ReceivedDeletedBlockInfo b : srdb.getBlocks()) {
                blocks = blocks + " " + b.getBlock().getBlockId() + b.toString();
            }
            blocks = blocks + "]";
        }
        NameNode.LOG.info((Object)("sending blockReceivedAndDeletedWithRetry for blocks " + blocks));
        this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                actor.blockReceivedAndDeleted(BPOfferService.this.bpRegistration, BPOfferService.this.getBlockPoolId(), receivedAndDeletedBlocks);
                return null;
            }
        });
    }

    private void reportRemoteBadBlockWithRetry(final DatanodeInfo dnInfo, final ExtendedBlock block) throws IOException {
        this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                actor.reportRemoteBadBlock(dnInfo, block);
                return null;
            }
        });
    }

    public byte[] getSmallFileDataFromNN(final int id) throws IOException {
        byte[] data = (byte[])this.doActorActionWithRetry(new ActorActionHandler(){

            @Override
            public Object doAction(BPServiceActor actor) throws IOException {
                return actor.getSmallFileDataFromNN(id);
            }
        });
        return data;
    }

    private Object doActorActionWithRetry(ActorActionHandler handler) throws IOException {
        Exception exception = null;
        boolean success = false;
        BPServiceActor actor = null;
        int MAX_RPC_RETRIES = this.nnList.size();
        for (int i = 0; i <= MAX_RPC_RETRIES; ++i) {
            try {
                actor = this.nextNNForNonBlkReportRPC();
                if (actor == null) continue;
                Object obj = handler.doAction(actor);
                success = true;
                return obj;
            }
            catch (Exception e) {
                exception = e;
                if (!ExceptionCheck.isLocalConnectException(e) && !ExceptionCheck.isRetriableException(e)) break;
                LOG.debug((Object)("RPC faild. NN used was " + actor.getNNSocketAddress() + ", retries left (" + (MAX_RPC_RETRIES - i) + ")  Exception " + e));
                if (!ExceptionCheck.isLocalConnectException(e)) continue;
                this.blackListNN.add(actor.getNNSocketAddress());
            }
        }
        if (!success && exception != null) {
            if (exception instanceof RemoteException) {
                throw (RemoteException)((Object)exception);
            }
            throw (IOException)exception;
        }
        return null;
    }

    private synchronized BPServiceActor nextNNForNonBlkReportRPC() {
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        for (int i = 0; i < 10; ++i) {
            try {
                BPServiceActor actor;
                ++this.rpcRoundRobinIndex;
                this.rpcRoundRobinIndex %= this.nnList.size();
                ActiveNode ann = this.nnList.get(this.rpcRoundRobinIndex);
                if (this.blackListNN.contains(ann.getRpcServerAddressForDatanodes()) || (actor = this.getAnActor(ann.getRpcServerAddressForDatanodes())) == null) continue;
                return actor;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        return null;
    }

    private ActiveNode nextNNForBlkReport(long noOfBlks) throws IOException {
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        ActiveNode annToBR = null;
        BPServiceActor leaderActor = this.getLeaderActor();
        if (leaderActor != null) {
            try {
                annToBR = leaderActor.nextNNForBlkReport(noOfBlks, this.bpRegistration);
            }
            catch (RemoteException e) {
                if (e.getClassName().equals(BRLoadBalancingException.class.getName())) {
                    LOG.warn((Object)e);
                }
                throw e;
            }
        }
        return annToBR;
    }

    private ActiveNode nextNNForCacheReport(long noOfBlks) throws IOException {
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        ActiveNode annToBR = null;
        BPServiceActor leaderActor = this.getLeaderActor();
        if (leaderActor != null) {
            try {
                annToBR = leaderActor.nextNNForCacheReport(noOfBlks, this.bpRegistration);
            }
            catch (RemoteException e) {
                if (e.getClassName().equals(CRLoadBalancingException.class.getName())) {
                    LOG.warn((Object)e);
                }
                throw e;
            }
        }
        return annToBR;
    }

    private BPServiceActor getLeaderActor() {
        if (this.nnList.size() > 0) {
            ActiveNode leaderNode = null;
            for (ActiveNode an : this.nnList) {
                if (leaderNode == null) {
                    leaderNode = an;
                }
                if (leaderNode.getId() <= an.getId()) continue;
                leaderNode = an;
            }
            BPServiceActor leaderActor = this.getAnActor(leaderNode.getRpcServerAddressForDatanodes());
            return leaderActor;
        }
        return null;
    }

    private synchronized void forwardRRIndex() {
        if (this.nnList != null && !this.nnList.isEmpty()) {
            for (int i = 0; i < 10; ++i) {
                ++this.refreshNNRoundRobinIndex;
                this.refreshNNRoundRobinIndex %= this.nnList.size();
                ActiveNode ann = this.nnList.get(this.refreshNNRoundRobinIndex);
                if (this.blackListNN.contains(ann.getRpcServerAddressForDatanodes())) continue;
                return;
            }
        } else {
            this.refreshNNRoundRobinIndex = -1;
        }
    }

    private static class PerStoragePendingIncrementalBR {
        private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR = Maps.newHashMap();

        private PerStoragePendingIncrementalBR() {
        }

        int getBlockInfoCount() {
            return this.pendingIncrementalBR.size();
        }

        ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
            ReceivedDeletedBlockInfo[] blockInfos = this.pendingIncrementalBR.values().toArray(new ReceivedDeletedBlockInfo[this.getBlockInfoCount()]);
            this.pendingIncrementalBR.clear();
            return blockInfos;
        }

        int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
            int blocksPut = 0;
            for (ReceivedDeletedBlockInfo rdbi : blockArray) {
                if (this.pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) continue;
                this.pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
                ++blocksPut;
            }
            return blocksPut;
        }

        void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
            this.pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
        }

        boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
            return this.pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null;
        }
    }

    private static interface ActorActionHandler {
        public Object doAction(BPServiceActor var1) throws IOException;
    }

    public class IncrementalBRTask
    implements Callable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Object call() throws Exception {
            Object object;
            PerStoragePendingIncrementalBR perStorageMap;
            ArrayList<StorageReceivedDeletedBlocks> reports = new ArrayList<StorageReceivedDeletedBlocks>(BPOfferService.this.pendingIncrementalBRperStorage.size());
            Object object2 = BPOfferService.this.pendingIncrementalBRperStorage;
            synchronized (object2) {
                for (Map.Entry entry : BPOfferService.this.pendingIncrementalBRperStorage.entrySet()) {
                    DatanodeStorage storage = (DatanodeStorage)entry.getKey();
                    perStorageMap = (PerStoragePendingIncrementalBR)entry.getValue();
                    if (perStorageMap.getBlockInfoCount() <= 0) continue;
                    ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
                    reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
                }
                BPOfferService.this.sendImmediateIBR = false;
            }
            if (reports.size() == 0) {
                object2 = BPOfferService.this.incrementalBRLock;
                synchronized (object2) {
                    BPOfferService.this.incrementalBRCounter--;
                }
                return null;
            }
            boolean success = false;
            try {
                BPOfferService.this.blockReceivedAndDeletedWithRetry(reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
                success = true;
            }
            finally {
                if (!success) {
                    object = BPOfferService.this.pendingIncrementalBRperStorage;
                    synchronized (object) {
                        for (StorageReceivedDeletedBlocks report : reports) {
                            perStorageMap = (PerStoragePendingIncrementalBR)BPOfferService.this.pendingIncrementalBRperStorage.get(report.getStorage());
                            perStorageMap.putMissingBlockInfos(report.getBlocks());
                            BPOfferService.this.sendImmediateIBR = true;
                        }
                    }
                }
            }
            object = BPOfferService.this.incrementalBRLock;
            synchronized (object) {
                BPOfferService.this.incrementalBRCounter--;
            }
            return null;
        }
    }

    private class BRTask
    implements Callable {
        private BRTask() {
        }

        public Object call() throws Exception {
            List<DatanodeCommand> cmds = BPOfferService.this.blockReport();
            if (cmds != null && BPOfferService.this.blkReportHander != null) {
                BPOfferService.this.blkReportHander.processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
            }
            DatanodeCommand cmd = BPOfferService.this.cacheReport(cmds != null);
            BPOfferService.this.blkReportHander.processCommand(new DatanodeCommand[]{cmd});
            if (((BPOfferService)BPOfferService.this).dn.blockScanner != null) {
                ((BPOfferService)BPOfferService.this).dn.blockScanner.addBlockPool(BPOfferService.this.getBlockPoolId());
            }
            return null;
        }
    }
}

