package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.hops.leader_election.node.ActiveNode;
import io.hops.leader_election.node.ActiveNodePBImpl;
import io.hops.leader_election.node.SortedActiveNodeList;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExceptionCheck;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPOfferService.class */
public class BPOfferService implements Runnable {
    static final Log LOG;
    NamespaceInfo bpNSInfo;
    volatile DatanodeRegistration bpRegistration;
    private final DataNode dn;
    private final DNConf dnConf;
    final int maxNumIncrementalReportThreads;
    private final ExecutorService incrementalBRExecutor;
    private final ExecutorService brDispatcher;
    static final /* synthetic */ boolean $assertionsDisabled;
    private BPServiceActor bpServiceToActive = null;
    private List<BPServiceActor> bpServices = new CopyOnWriteArrayList();
    private long lastActiveClaimTxId = -1;
    private volatile boolean sendImmediateIBR = false;
    volatile long lastDeletedReport = 0;
    private volatile long lastBlockReport = 0;
    private boolean resetBlockReportTime = true;
    volatile long lastCacheReport = 0;
    private BPServiceActor blkReportHander = null;
    private List<ActiveNode> nnList = Collections.synchronizedList(new ArrayList());
    private List<InetSocketAddress> blackListNN = Collections.synchronizedList(new ArrayList());
    private volatile int rpcRoundRobinIndex = 0;
    private volatile int refreshNNRoundRobinIndex = 0;
    private final Map<DatanodeStorage, PerStoragePendingIncrementalBR> pendingIncrementalBRperStorage = Maps.newHashMap();
    private Thread blockReportThread = null;
    private BRTask brTask = new BRTask();
    private Future futur = null;
    private final Object incrementalBRLock = new Object();
    private int incrementalBRCounter = 0;
    private final IncrementalBRTask incrementalBRTask = new IncrementalBRTask();
    long lastupdate = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPOfferService$ActorActionHandler.class */
    public interface ActorActionHandler {
        Object doAction(BPServiceActor bPServiceActor) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPOfferService$BRTask.class */
    public class BRTask implements Callable {
        private BRTask() {
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            List<DatanodeCommand> blockReport = BPOfferService.this.blockReport();
            if (blockReport != null && BPOfferService.this.blkReportHander != null) {
                BPOfferService.this.blkReportHander.processCommand(blockReport == null ? null : (DatanodeCommand[]) blockReport.toArray(new DatanodeCommand[blockReport.size()]));
            }
            DatanodeCommand cacheReport = BPOfferService.this.cacheReport(blockReport != null);
            if (cacheReport != null && BPOfferService.this.blkReportHander != null) {
                BPOfferService.this.blkReportHander.processCommand(new DatanodeCommand[]{cacheReport});
            }
            if (BPOfferService.this.dn.blockScanner == null) {
                return null;
            }
            BPOfferService.this.dn.blockScanner.addBlockPool(BPOfferService.this.getBlockPoolId());
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPOfferService$IncrementalBRTask.class */
    public class IncrementalBRTask implements Callable {
        public IncrementalBRTask() {
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            ArrayList<StorageReceivedDeletedBlocks> arrayList = new ArrayList(BPOfferService.this.pendingIncrementalBRperStorage.size());
            synchronized (BPOfferService.this.pendingIncrementalBRperStorage) {
                for (Map.Entry entry : BPOfferService.this.pendingIncrementalBRperStorage.entrySet()) {
                    DatanodeStorage datanodeStorage = (DatanodeStorage) entry.getKey();
                    PerStoragePendingIncrementalBR perStoragePendingIncrementalBR = (PerStoragePendingIncrementalBR) entry.getValue();
                    if (perStoragePendingIncrementalBR.getBlockInfoCount() > 0) {
                        arrayList.add(new StorageReceivedDeletedBlocks(datanodeStorage, perStoragePendingIncrementalBR.dequeueBlockInfos()));
                    }
                }
                BPOfferService.this.sendImmediateIBR = false;
            }
            if (arrayList.size() == 0) {
                synchronized (BPOfferService.this.incrementalBRLock) {
                    BPOfferService.access$610(BPOfferService.this);
                }
                return null;
            }
            boolean z = false;
            try {
                BPOfferService.this.blockReceivedAndDeletedWithRetry((StorageReceivedDeletedBlocks[]) arrayList.toArray(new StorageReceivedDeletedBlocks[arrayList.size()]));
                z = true;
                if (1 == 0) {
                    synchronized (BPOfferService.this.pendingIncrementalBRperStorage) {
                        for (StorageReceivedDeletedBlocks storageReceivedDeletedBlocks : arrayList) {
                            ((PerStoragePendingIncrementalBR) BPOfferService.this.pendingIncrementalBRperStorage.get(storageReceivedDeletedBlocks.getStorage())).putMissingBlockInfos(storageReceivedDeletedBlocks.getBlocks());
                            BPOfferService.this.sendImmediateIBR = true;
                        }
                    }
                }
                synchronized (BPOfferService.this.incrementalBRLock) {
                    BPOfferService.access$610(BPOfferService.this);
                }
                return null;
            } catch (Throwable th) {
                if (!z) {
                    synchronized (BPOfferService.this.pendingIncrementalBRperStorage) {
                        for (StorageReceivedDeletedBlocks storageReceivedDeletedBlocks2 : arrayList) {
                            ((PerStoragePendingIncrementalBR) BPOfferService.this.pendingIncrementalBRperStorage.get(storageReceivedDeletedBlocks2.getStorage())).putMissingBlockInfos(storageReceivedDeletedBlocks2.getBlocks());
                            BPOfferService.this.sendImmediateIBR = true;
                        }
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPOfferService$PerStoragePendingIncrementalBR.class */
    public static class PerStoragePendingIncrementalBR {
        private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR;

        private PerStoragePendingIncrementalBR() {
            this.pendingIncrementalBR = Maps.newHashMap();
        }

        int getBlockInfoCount() {
            return this.pendingIncrementalBR.size();
        }

        ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
            ReceivedDeletedBlockInfo[] receivedDeletedBlockInfoArr = (ReceivedDeletedBlockInfo[]) this.pendingIncrementalBR.values().toArray(new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
            this.pendingIncrementalBR.clear();
            return receivedDeletedBlockInfoArr;
        }

        int putMissingBlockInfos(ReceivedDeletedBlockInfo[] receivedDeletedBlockInfoArr) {
            int i = 0;
            for (ReceivedDeletedBlockInfo receivedDeletedBlockInfo : receivedDeletedBlockInfoArr) {
                if (!this.pendingIncrementalBR.containsKey(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId()))) {
                    this.pendingIncrementalBR.put(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId()), receivedDeletedBlockInfo);
                    i++;
                }
            }
            return i;
        }

        void putBlockInfo(ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
            this.pendingIncrementalBR.put(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId()), receivedDeletedBlockInfo);
        }

        boolean removeBlockInfo(ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
            return this.pendingIncrementalBR.remove(Long.valueOf(receivedDeletedBlockInfo.getBlock().getBlockId())) != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BPOfferService(List<InetSocketAddress> list, DataNode dataNode) {
        Preconditions.checkArgument(!list.isEmpty(), "Must pass at least one NN.");
        this.dn = dataNode;
        for (InetSocketAddress inetSocketAddress : list) {
            this.bpServices.add(new BPServiceActor(inetSocketAddress, this));
            this.nnList.add(new ActiveNodePBImpl(0L, "", inetSocketAddress.getAddress().getHostAddress(), inetSocketAddress.getPort(), "", inetSocketAddress.getAddress().getHostAddress(), inetSocketAddress.getPort()));
        }
        this.dnConf = dataNode.getDnConf();
        this.maxNumIncrementalReportThreads = this.dnConf.iBRDispatherTPSize;
        this.incrementalBRExecutor = Executors.newFixedThreadPool(this.maxNumIncrementalReportThreads);
        this.brDispatcher = Executors.newSingleThreadExecutor();
    }

    void refreshNNList(ArrayList<InetSocketAddress> arrayList) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            newHashSet.add(it.next().getNNSocketAddress());
        }
        HashSet newHashSet2 = Sets.newHashSet(arrayList);
        Sets.SetView difference = Sets.difference(newHashSet, newHashSet2);
        Sets.SetView difference2 = Sets.difference(newHashSet2, newHashSet);
        if (difference.size() != 0) {
            Iterator it2 = difference.iterator();
            while (it2.hasNext()) {
                BPServiceActor stopAnActor = stopAnActor((InetSocketAddress) it2.next());
                this.bpServices.remove(stopAnActor);
                LOG.debug("Stopped actor for " + stopAnActor.getNNSocketAddress());
            }
        }
        if (difference2.size() != 0) {
            Iterator it3 = difference2.iterator();
            while (it3.hasNext()) {
                BPServiceActor startAnActor = startAnActor((InetSocketAddress) it3.next());
                this.bpServices.add(startAnActor);
                LOG.debug("Started actor for " + startAnActor.getNNSocketAddress());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isInitialized() {
        return this.bpRegistration != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAlive() {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            if (it.next().isAlive()) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized String getBlockPoolId() {
        if (this.bpNSInfo != null) {
            return this.bpNSInfo.getBlockPoolID();
        }
        LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace"));
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized NamespaceInfo getNamespaceInfo() {
        return this.bpNSInfo;
    }

    public synchronized String toString() {
        if (this.bpNSInfo != null) {
            return "Block pool " + getBlockPoolId() + " (Datanode Uuid " + this.dn.getDatanodeUuid() + ")";
        }
        String datanodeUuid = this.dn.getDatanodeUuid();
        if (datanodeUuid == null || datanodeUuid.isEmpty()) {
            datanodeUuid = "unassigned";
        }
        return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportBadBlocks(ExtendedBlock extendedBlock, String str, StorageType storageType) {
        checkBlock(extendedBlock);
        try {
            reportBadBlocksWithRetry(extendedBlock, str, storageType);
        } catch (Exception e) {
            LOG.error("Failed to send bad block report to any namenode ");
            e.printStackTrace();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyNamenodeReceivedBlock(ExtendedBlock extendedBlock, String str, String str2) {
        checkBlock(extendedBlock);
        checkDelHint(str);
        notifyNamenodeBlockImmediatelyInt(new ReceivedDeletedBlockInfo(extendedBlock.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED, str), str2, true);
    }

    private void checkBlock(ExtendedBlock extendedBlock) {
        Preconditions.checkArgument(extendedBlock != null, "block is null");
        Preconditions.checkArgument(extendedBlock.getBlockPoolId().equals(getBlockPoolId()), "block belongs to BP %s instead of BP %s", new Object[]{extendedBlock.getBlockPoolId(), getBlockPoolId()});
    }

    private void checkDelHint(String str) {
        Preconditions.checkArgument(str != null, "delHint is null");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyNamenodeDeletedBlock(ExtendedBlock extendedBlock, String str) {
        checkBlock(extendedBlock);
        notifyNamenodeDeletedBlockInt(new ReceivedDeletedBlockInfo(extendedBlock.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.DELETED, null), this.dn.getFSDataset().getStorage(str));
    }

    public void notifyNamenodeCreatingBlock(ExtendedBlock extendedBlock, String str) {
        checkBlock(extendedBlock);
        notifyNamenodeBlockImmediatelyInt(new ReceivedDeletedBlockInfo(extendedBlock.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.CREATING, null), str, false);
    }

    public void notifyNamenodeAppendingBlock(ExtendedBlock extendedBlock, String str) {
        checkBlock(extendedBlock);
        notifyNamenodeBlockImmediatelyInt(new ReceivedDeletedBlockInfo(extendedBlock.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.APPENDING, null), str, false);
    }

    public void notifyNamenodeAppendingRecoveredAppend(ExtendedBlock extendedBlock, String str) {
        checkBlock(extendedBlock);
        notifyNamenodeBlockImmediatelyInt(new ReceivedDeletedBlockInfo(extendedBlock.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECOVERING_APPEND, null), str, true);
    }

    public void notifyNamenodeUpdateRecoveredBlock(ExtendedBlock extendedBlock, String str) {
        checkBlock(extendedBlock);
        notifyNamenodeBlockImmediatelyInt(new ReceivedDeletedBlockInfo(extendedBlock.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.UPDATE_RECOVERED, null), str, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void join() {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            it.next().join();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataNode getDataNode() {
        return this.dn;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void verifyAndSetNamespaceInfo(NamespaceInfo namespaceInfo) throws IOException {
        if (this.bpNSInfo != null) {
            checkNSEquality(this.bpNSInfo.getBlockPoolID(), namespaceInfo.getBlockPoolID(), "Blockpool ID");
            checkNSEquality(Integer.valueOf(this.bpNSInfo.getNamespaceID()), Integer.valueOf(namespaceInfo.getNamespaceID()), "Namespace ID");
            checkNSEquality(this.bpNSInfo.getClusterID(), namespaceInfo.getClusterID(), "Cluster ID");
            return;
        }
        this.bpNSInfo = namespaceInfo;
        boolean z = false;
        try {
            this.dn.initBlockPool(this);
            z = true;
            if (1 == 0) {
                this.bpNSInfo = null;
            }
        } catch (Throwable th) {
            if (!z) {
                this.bpNSInfo = null;
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registrationSucceeded(BPServiceActor bPServiceActor, DatanodeRegistration datanodeRegistration) throws IOException {
        if (this.bpRegistration != null) {
            checkNSEquality(Integer.valueOf(this.bpRegistration.getStorageInfo().getNamespaceID()), Integer.valueOf(datanodeRegistration.getStorageInfo().getNamespaceID()), "namespace ID");
            checkNSEquality(this.bpRegistration.getStorageInfo().getClusterID(), datanodeRegistration.getStorageInfo().getClusterID(), "cluster ID");
        } else {
            this.bpRegistration = datanodeRegistration;
        }
        this.dn.bpRegistrationSucceeded(this.bpRegistration, getBlockPoolId());
        if (this.dn.isBlockTokenEnabled) {
            this.dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), datanodeRegistration.getExportedKeys());
        }
    }

    private static void checkNSEquality(Object obj, Object obj2, String str) throws IOException {
        if (!obj.equals(obj2)) {
            throw new IOException(str + " mismatch: previously connected to " + str + " " + obj + " but now connected to " + str + " " + obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized DatanodeRegistration createRegistration() {
        Preconditions.checkState(this.bpNSInfo != null, "getRegistration() can only be called after initial handshake");
        return this.dn.createBPRegistration(this.bpNSInfo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void shutdownActor(BPServiceActor bPServiceActor) {
        if (this.bpServiceToActive == bPServiceActor) {
            this.bpServiceToActive = null;
        }
        this.bpServices.remove(bPServiceActor);
        Iterator<ActiveNode> it = this.nnList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ActiveNode next = it.next();
            if (next.getRpcServerAddressForDatanodes().equals(bPServiceActor.getNNSocketAddress())) {
                this.nnList.remove(next);
                break;
            }
        }
        if (this.bpServices.isEmpty()) {
            this.dn.shutdownBlockPool(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void trySendErrorReport(int i, String str) {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            it.next().trySendErrorReport(i, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleBlockReport(long j) {
        scheduleBlockReportInt(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportRemoteBadBlock(DatanodeInfo datanodeInfo, ExtendedBlock extendedBlock) {
        try {
            reportRemoteBadBlockWithRetry(datanodeInfo, extendedBlock);
        } catch (IOException e) {
            LOG.warn("Couldn't report bad block " + extendedBlock + "" + e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
        if (this.bpServiceToActive != null) {
            return this.bpServiceToActive.bpNamenode;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public List<BPServiceActor> getBPServiceActors() {
        return Lists.newArrayList(this.bpServices);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void signalRollingUpgrade(boolean z) {
        if (z) {
            this.dn.getFSDataset().enableTrash(getBlockPoolId());
        } else {
            this.dn.getFSDataset().restoreTrash(getBlockPoolId());
        }
    }

    boolean containsNN(InetSocketAddress inetSocketAddress) {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            if (it.next().getNNSocketAddress().equals(inetSocketAddress)) {
                return true;
            }
        }
        return false;
    }

    @VisibleForTesting
    int countNameNodes() {
        return this.bpServices.size();
    }

    @VisibleForTesting
    void triggerBlockReportForTests() throws IOException {
        triggerBlockReportForTestsInt();
    }

    @VisibleForTesting
    void triggerDeletionReportForTests() throws IOException {
        triggerDeletionReportForTestsInt();
    }

    @VisibleForTesting
    void triggerHeartbeatForTests() throws IOException {
        Iterator<BPServiceActor> it = this.bpServices.iterator();
        while (it.hasNext()) {
            it.next().triggerHeartbeatForTests();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean processCommandFromActor(DatanodeCommand datanodeCommand, BPServiceActor bPServiceActor) throws IOException {
        boolean processCommandFromActive;
        if (!$assertionsDisabled && !this.bpServices.contains(bPServiceActor)) {
            throw new AssertionError();
        }
        if (datanodeCommand == null) {
            return true;
        }
        if (4 == datanodeCommand.getAction()) {
            LOG.info("DatanodeCommand action : DNA_REGISTER from " + bPServiceActor.nnAddr);
            bPServiceActor.reRegister();
            return true;
        }
        synchronized (this) {
            processCommandFromActive = processCommandFromActive(datanodeCommand, bPServiceActor);
        }
        return processCommandFromActive;
    }

    private String blockIdArrayToString(long[] jArr) {
        long maxNumberOfBlocksToLog = this.dn.getMaxNumberOfBlocksToLog();
        StringBuilder sb = new StringBuilder();
        String str = "";
        int i = 0;
        while (true) {
            if (i >= jArr.length) {
                break;
            }
            if (i >= maxNumberOfBlocksToLog) {
                sb.append("...");
                break;
            }
            sb.append(str).append(jArr[i]);
            str = ", ";
            i++;
        }
        return sb.toString();
    }

    private boolean processCommandFromActive(DatanodeCommand datanodeCommand, BPServiceActor bPServiceActor) throws IOException {
        BlockCommand blockCommand = datanodeCommand instanceof BlockCommand ? (BlockCommand) datanodeCommand : null;
        BlockIdCommand blockIdCommand = datanodeCommand instanceof BlockIdCommand ? (BlockIdCommand) datanodeCommand : null;
        switch (datanodeCommand.getAction()) {
            case 1:
                this.dn.transferBlocks(blockCommand.getBlockPoolId(), blockCommand.getBlocks(), blockCommand.getTargets(), blockCommand.getTargetStorageTypes());
                this.dn.metrics.incrBlocksReplicated(blockCommand.getBlocks().length);
                return true;
            case 2:
                Block[] blocks = blockCommand.getBlocks();
                try {
                    if (this.dn.blockScanner != null) {
                        this.dn.blockScanner.deleteBlocks(blockCommand.getBlockPoolId(), blocks);
                    }
                    this.dn.getFSDataset().invalidate(blockCommand.getBlockPoolId(), blocks);
                    this.dn.metrics.incrBlocksRemoved(blocks.length);
                    return true;
                } catch (IOException e) {
                    throw e;
                }
            case 3:
                throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
            case 4:
            default:
                LOG.warn("Unknown DatanodeCommand action: " + datanodeCommand.getAction());
                return true;
            case 5:
                String blockPoolId = ((FinalizeCommand) datanodeCommand).getBlockPoolId();
                if (!$assertionsDisabled && !getBlockPoolId().equals(blockPoolId)) {
                    throw new AssertionError("BP " + getBlockPoolId() + " received DNA_FINALIZE for other block pool " + blockPoolId);
                }
                this.dn.finalizeUpgradeForPool(blockPoolId);
                return true;
            case 6:
                this.dn.recoverBlocks("NameNode at " + bPServiceActor.getNNSocketAddress(), ((BlockRecoveryCommand) datanodeCommand).getRecoveringBlocks());
                return true;
            case 7:
                LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
                if (!this.dn.isBlockTokenEnabled) {
                    return true;
                }
                this.dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), ((KeyUpdateCommand) datanodeCommand).getExportedKeys());
                return true;
            case 8:
                LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
                long balancerBandwidthValue = ((BalancerBandwidthCommand) datanodeCommand).getBalancerBandwidthValue();
                if (balancerBandwidthValue <= 0) {
                    return true;
                }
                DataXceiverServer dataXceiverServer = (DataXceiverServer) this.dn.dataXceiverServer.getRunnable();
                LOG.info("Updating balance throttler bandwidth from " + dataXceiverServer.balanceThrottler.getBandwidth() + " bytes/s to: " + balancerBandwidthValue + " bytes/s.");
                dataXceiverServer.balanceThrottler.setBandwidth(balancerBandwidthValue);
                return true;
            case 9:
                LOG.info("DatanodeCommand action: DNA_CACHE for " + blockIdCommand.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCommand.getBlockIds()) + "]");
                this.dn.getFSDataset().cache(blockIdCommand.getBlockPoolId(), blockIdCommand.getBlockIds());
                return true;
            case 10:
                LOG.info("DatanodeCommand action: DNA_UNCACHE for " + blockIdCommand.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCommand.getBlockIds()) + "]");
                this.dn.getFSDataset().uncache(blockIdCommand.getBlockPoolId(), blockIdCommand.getBlockIds());
                return true;
        }
    }

    private BPServiceActor stopAnActor(InetSocketAddress inetSocketAddress) {
        BPServiceActor anActor = getAnActor(inetSocketAddress);
        if (anActor == null) {
            return null;
        }
        anActor.stop();
        return anActor;
    }

    private BPServiceActor startAnActor(InetSocketAddress inetSocketAddress) {
        BPServiceActor bPServiceActor = new BPServiceActor(inetSocketAddress, this);
        bPServiceActor.start();
        return bPServiceActor;
    }

    private BPServiceActor getAnActor(InetSocketAddress inetSocketAddress) {
        if (inetSocketAddress == null) {
            return null;
        }
        for (BPServiceActor bPServiceActor : this.bpServices) {
            if (bPServiceActor.getNNSocketAddress().equals(inetSocketAddress)) {
                return bPServiceActor;
            }
        }
        return null;
    }

    private void whirlingLikeASufi() throws Exception {
        while (this.dn.shouldRun) {
            try {
                long now = Time.now();
                if (this.sendImmediateIBR || now - this.lastDeletedReport > this.dnConf.deleteReportInterval) {
                    reportReceivedDeletedBlocks();
                    this.lastDeletedReport = now;
                }
                startBRThread();
                synchronized (this.pendingIncrementalBRperStorage) {
                    if (1000 > 0) {
                        if (!this.sendImmediateIBR) {
                            try {
                                this.pendingIncrementalBRperStorage.wait(1000L);
                            } catch (InterruptedException e) {
                                LOG.warn("BPOfferService for " + this + " interrupted");
                            }
                        }
                    }
                }
                forwardRRIndex();
            } catch (Exception e2) {
                LOG.warn("Exception in whirlingLikeASufi", e2);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void startBRThread() throws InterruptedException, ExecutionException {
        if (this.futur == null || this.futur.isDone()) {
            if (this.futur != null) {
                try {
                    this.futur.get();
                } finally {
                    this.futur = null;
                }
            }
            this.futur = this.brDispatcher.submit(this.brTask);
        }
    }

    private void reportReceivedDeletedBlocks() throws IOException {
        synchronized (this.incrementalBRLock) {
            if (this.incrementalBRCounter < this.maxNumIncrementalReportThreads) {
                this.incrementalBRCounter++;
                this.incrementalBRExecutor.submit(this.incrementalBRTask);
            }
        }
    }

    private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(DatanodeStorage datanodeStorage) {
        PerStoragePendingIncrementalBR perStoragePendingIncrementalBR = this.pendingIncrementalBRperStorage.get(datanodeStorage);
        if (perStoragePendingIncrementalBR == null) {
            perStoragePendingIncrementalBR = new PerStoragePendingIncrementalBR();
            this.pendingIncrementalBRperStorage.put(datanodeStorage, perStoragePendingIncrementalBR);
        }
        return perStoragePendingIncrementalBR;
    }

    boolean addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo receivedDeletedBlockInfo, DatanodeStorage datanodeStorage) {
        boolean z = true;
        Iterator<Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR>> it = this.pendingIncrementalBRperStorage.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().getValue().removeBlockInfo(receivedDeletedBlockInfo)) {
                z = false;
                break;
            }
        }
        getIncrementalBRMapForStorage(datanodeStorage).putBlockInfo(receivedDeletedBlockInfo);
        return z;
    }

    void notifyNamenodeBlockImmediatelyInt(ReceivedDeletedBlockInfo receivedDeletedBlockInfo, String str, boolean z) {
        synchronized (this.pendingIncrementalBRperStorage) {
            addPendingReplicationBlockInfo(receivedDeletedBlockInfo, this.dn.getFSDataset().getStorage(str));
            this.sendImmediateIBR = true;
            if (z) {
                this.pendingIncrementalBRperStorage.notifyAll();
            }
        }
    }

    void notifyNamenodeDeletedBlockInt(ReceivedDeletedBlockInfo receivedDeletedBlockInfo, DatanodeStorage datanodeStorage) {
        synchronized (this.pendingIncrementalBRperStorage) {
            addPendingReplicationBlockInfo(receivedDeletedBlockInfo, datanodeStorage);
        }
    }

    List<DatanodeCommand> blockReport() throws IOException {
        int i;
        long now = Time.now();
        if (now - this.lastBlockReport <= this.dnConf.blockReportInterval) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        reportReceivedDeletedBlocks();
        this.lastDeletedReport = now;
        long now2 = Time.now();
        Map<DatanodeStorage, BlockReport> blockReports = this.dn.getFSDataset().getBlockReports(getBlockPoolId());
        int i2 = 0;
        int i3 = 0;
        StorageBlockReport[] storageBlockReportArr = new StorageBlockReport[blockReports.size()];
        for (Map.Entry<DatanodeStorage, BlockReport> entry : blockReports.entrySet()) {
            BlockReport value = entry.getValue();
            int i4 = i2;
            i2++;
            storageBlockReportArr[i4] = new StorageBlockReport(entry.getKey(), value);
            i3 += value.getNumberOfBlocks();
        }
        ActiveNode nextNNForBlkReport = nextNNForBlkReport(i3);
        if (nextNNForBlkReport == null) {
            LOG.warn("Unable to send block report");
            return null;
        }
        this.blkReportHander = getAnActor(nextNNForBlkReport.getRpcServerAddressForDatanodes());
        if (this.blkReportHander == null || !this.blkReportHander.isInitialized()) {
            return null;
        }
        long now3 = Time.now();
        if (i3 < this.dnConf.blockReportSplitThreshold) {
            i = 1;
            DatanodeCommand blockReport = this.blkReportHander.blockReport(this.bpRegistration, getBlockPoolId(), storageBlockReportArr);
            if (blockReport != null) {
                arrayList.add(blockReport);
            }
        } else {
            i = i2;
            for (StorageBlockReport storageBlockReport : storageBlockReportArr) {
                DatanodeCommand blockReport2 = this.blkReportHander.blockReport(this.bpRegistration, getBlockPoolId(), new StorageBlockReport[]{storageBlockReport});
                if (blockReport2 != null) {
                    arrayList.add(blockReport2);
                }
            }
        }
        long now4 = Time.now() - now3;
        long j = now3 - now2;
        this.dn.getMetrics().addBlockReport(now4);
        LOG.info("Sent " + i + " blockreports " + i3 + " blocks total. Took " + j + " msec to generate and " + now4 + " msecs for RPC and NN processing.  Got back commands " + (arrayList.size() == 0 ? "none" : Joiner.on("; ").join(arrayList)));
        scheduleNextBlockReport(now);
        if (arrayList.size() == 0) {
            return null;
        }
        return arrayList;
    }

    private void scheduleNextBlockReport(long j) {
        if (!this.resetBlockReportTime) {
            this.lastBlockReport += ((Time.now() - this.lastBlockReport) / this.dnConf.blockReportInterval) * this.dnConf.blockReportInterval;
        } else {
            this.lastBlockReport = j - DFSUtil.getRandom().nextInt((int) this.dnConf.blockReportInterval);
            this.resetBlockReportTime = false;
        }
    }

    DatanodeCommand cacheReport(boolean z) throws IOException {
        if (this.dn.getFSDataset().getCacheCapacity() == 0) {
            return null;
        }
        DatanodeCommand datanodeCommand = null;
        long monotonicNow = Time.monotonicNow();
        if (monotonicNow - this.lastCacheReport > this.dnConf.cacheReportInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending cacheReport from service actor: " + this);
            }
            this.lastCacheReport = monotonicNow;
            String blockPoolId = getBlockPoolId();
            List<Long> cacheReport = this.dn.getFSDataset().getCacheReport(blockPoolId);
            long monotonicNow2 = Time.monotonicNow();
            if (!z) {
                ActiveNode nextNNForCacheReport = nextNNForCacheReport(cacheReport.size());
                if (nextNNForCacheReport == null) {
                    LOG.warn("Unable to send cache report");
                    return null;
                }
                this.blkReportHander = getAnActor(nextNNForCacheReport.getRpcServerAddressForDatanodes());
                if (this.blkReportHander == null || !this.blkReportHander.isInitialized()) {
                    return null;
                }
            }
            datanodeCommand = this.blkReportHander.cacheReport(this.bpRegistration, blockPoolId, cacheReport);
            long j = monotonicNow2 - monotonicNow;
            long monotonicNow3 = Time.monotonicNow() - monotonicNow2;
            this.dn.getMetrics().addCacheReport(monotonicNow3);
            LOG.debug("CacheReport of " + cacheReport.size() + " block(s) took " + j + " msec to generate and " + monotonicNow3 + " msecs for RPC and NN processing");
        }
        return datanodeCommand;
    }

    void scheduleBlockReportInt(long j) {
        if (j > 0) {
            this.lastBlockReport = Time.now() - (this.dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int) j));
        } else {
            this.lastBlockReport = 0L;
        }
        this.resetBlockReportTime = true;
    }

    void triggerBlockReportForTestsInt() {
        synchronized (this.pendingIncrementalBRperStorage) {
            this.lastBlockReport = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastBlockReport == 0) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    void triggerDeletionReportForTestsInt() {
        synchronized (this.pendingIncrementalBRperStorage) {
            this.lastDeletedReport = 0L;
            this.pendingIncrementalBRperStorage.notifyAll();
            while (this.lastDeletedReport == 0) {
                try {
                    this.pendingIncrementalBRperStorage.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    boolean hasPendingIBR() {
        return this.sendImmediateIBR;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void updateNNList(SortedActiveNodeList sortedActiveNodeList) throws IOException {
        ArrayList<InetSocketAddress> arrayList = new ArrayList<>();
        Iterator it = sortedActiveNodeList.getActiveNodes().iterator();
        while (it.hasNext()) {
            arrayList.add(((ActiveNode) it.next()).getRpcServerAddressForDatanodes());
        }
        refreshNNList(arrayList);
        if (sortedActiveNodeList.getLeader() != null) {
            this.bpServiceToActive = getAnActor(sortedActiveNodeList.getLeader().getRpcServerAddressForDatanodes());
        }
        this.nnList.clear();
        this.nnList.addAll(sortedActiveNodeList.getActiveNodes());
        this.blackListNN.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean canUpdateNNList(InetSocketAddress inetSocketAddress) {
        if (this.nnList == null || this.nnList.size() == 0) {
            return true;
        }
        if (System.currentTimeMillis() - this.lastupdate <= HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL) {
            return false;
        }
        this.lastupdate = System.currentTimeMillis();
        return true;
    }

    public void startWhirlingSufiThread() {
        if (this.blockReportThread == null || !this.blockReportThread.isAlive()) {
            this.blockReportThread = new Thread(this, "BlkReportThread");
            this.blockReportThread.setDaemon(true);
            this.blockReportThread.start();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            whirlingLikeASufi();
        } catch (Exception e) {
            LOG.warn("Unexpected exception in BPOfferService " + this, e);
        }
    }

    private void reportBadBlocksWithRetry(final ExtendedBlock extendedBlock, final String str, final StorageType storageType) throws IOException {
        doActorActionWithRetry(new ActorActionHandler() { // from class: org.apache.hadoop.hdfs.server.datanode.BPOfferService.1
            @Override // org.apache.hadoop.hdfs.server.datanode.BPOfferService.ActorActionHandler
            public Object doAction(BPServiceActor bPServiceActor) throws IOException {
                bPServiceActor.reportBadBlocks(extendedBlock, str, storageType);
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void blockReceivedAndDeletedWithRetry(final StorageReceivedDeletedBlocks[] storageReceivedDeletedBlocksArr) throws IOException {
        String str = "";
        for (StorageReceivedDeletedBlocks storageReceivedDeletedBlocks : storageReceivedDeletedBlocksArr) {
            String str2 = str + "[";
            for (ReceivedDeletedBlockInfo receivedDeletedBlockInfo : storageReceivedDeletedBlocks.getBlocks()) {
                str2 = str2 + " " + receivedDeletedBlockInfo.getBlock().getBlockId() + receivedDeletedBlockInfo.toString();
            }
            str = str2 + "]";
        }
        NameNode.LOG.info("sending blockReceivedAndDeletedWithRetry for blocks " + str);
        doActorActionWithRetry(new ActorActionHandler() { // from class: org.apache.hadoop.hdfs.server.datanode.BPOfferService.2
            @Override // org.apache.hadoop.hdfs.server.datanode.BPOfferService.ActorActionHandler
            public Object doAction(BPServiceActor bPServiceActor) throws IOException {
                bPServiceActor.blockReceivedAndDeleted(BPOfferService.this.bpRegistration, BPOfferService.this.getBlockPoolId(), storageReceivedDeletedBlocksArr);
                return null;
            }
        });
    }

    private void reportRemoteBadBlockWithRetry(final DatanodeInfo datanodeInfo, final ExtendedBlock extendedBlock) throws IOException {
        doActorActionWithRetry(new ActorActionHandler() { // from class: org.apache.hadoop.hdfs.server.datanode.BPOfferService.3
            @Override // org.apache.hadoop.hdfs.server.datanode.BPOfferService.ActorActionHandler
            public Object doAction(BPServiceActor bPServiceActor) throws IOException {
                bPServiceActor.reportRemoteBadBlock(datanodeInfo, extendedBlock);
                return null;
            }
        });
    }

    public byte[] getSmallFileDataFromNN(final int i) throws IOException {
        return (byte[]) doActorActionWithRetry(new ActorActionHandler() { // from class: org.apache.hadoop.hdfs.server.datanode.BPOfferService.4
            @Override // org.apache.hadoop.hdfs.server.datanode.BPOfferService.ActorActionHandler
            public Object doAction(BPServiceActor bPServiceActor) throws IOException {
                return bPServiceActor.getSmallFileDataFromNN(i);
            }
        });
    }

    private Object doActorActionWithRetry(ActorActionHandler actorActionHandler) throws IOException {
        Exception exc = null;
        boolean z = false;
        BPServiceActor bPServiceActor = null;
        int size = this.nnList.size();
        for (int i = 0; i <= size; i++) {
            try {
                bPServiceActor = nextNNForNonBlkReportRPC();
            } catch (Exception e) {
                exc = e;
                if (!ExceptionCheck.isLocalConnectException(e) && !ExceptionCheck.isRetriableException(e)) {
                    break;
                }
                LOG.debug("RPC faild. NN used was " + bPServiceActor.getNNSocketAddress() + ", retries left (" + (size - i) + ")  Exception " + e);
                if (ExceptionCheck.isLocalConnectException(e)) {
                    this.blackListNN.add(bPServiceActor.getNNSocketAddress());
                }
            }
            if (bPServiceActor != null) {
                z = true;
                return actorActionHandler.doAction(bPServiceActor);
            }
            continue;
        }
        if (z || exc == null) {
            return null;
        }
        if (exc instanceof RemoteException) {
            throw ((RemoteException) exc);
        }
        throw ((IOException) exc);
    }

    private synchronized BPServiceActor nextNNForNonBlkReportRPC() {
        BPServiceActor anActor;
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        for (int i = 0; i < 10; i++) {
            try {
                int i2 = this.rpcRoundRobinIndex + 1;
                this.rpcRoundRobinIndex = i2;
                this.rpcRoundRobinIndex = i2 % this.nnList.size();
                ActiveNode activeNode = this.nnList.get(this.rpcRoundRobinIndex);
                if (!this.blackListNN.contains(activeNode.getRpcServerAddressForDatanodes()) && (anActor = getAnActor(activeNode.getRpcServerAddressForDatanodes())) != null) {
                    return anActor;
                }
            } catch (Exception e) {
            }
        }
        return null;
    }

    private ActiveNode nextNNForBlkReport(long j) throws IOException {
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        ActiveNode activeNode = null;
        BPServiceActor leaderActor = getLeaderActor();
        if (leaderActor != null) {
            try {
                activeNode = leaderActor.nextNNForBlkReport(j, this.bpRegistration);
            } catch (RemoteException e) {
                if (!e.getClassName().equals(BRLoadBalancingException.class.getName())) {
                    throw e;
                }
                LOG.warn(e);
            }
        }
        return activeNode;
    }

    private ActiveNode nextNNForCacheReport(long j) throws IOException {
        if (this.nnList == null || this.nnList.isEmpty()) {
            return null;
        }
        ActiveNode activeNode = null;
        BPServiceActor leaderActor = getLeaderActor();
        if (leaderActor != null) {
            try {
                activeNode = leaderActor.nextNNForCacheReport(j, this.bpRegistration);
            } catch (RemoteException e) {
                if (!e.getClassName().equals(CRLoadBalancingException.class.getName())) {
                    throw e;
                }
                LOG.warn(e);
            }
        }
        return activeNode;
    }

    private BPServiceActor getLeaderActor() {
        if (this.nnList.size() <= 0) {
            return null;
        }
        ActiveNode activeNode = null;
        for (ActiveNode activeNode2 : this.nnList) {
            if (activeNode == null) {
                activeNode = activeNode2;
            }
            if (activeNode.getId() > activeNode2.getId()) {
                activeNode = activeNode2;
            }
        }
        return getAnActor(activeNode.getRpcServerAddressForDatanodes());
    }

    private synchronized void forwardRRIndex() {
        if (this.nnList == null || this.nnList.isEmpty()) {
            this.refreshNNRoundRobinIndex = -1;
            return;
        }
        for (int i = 0; i < 10; i++) {
            int i2 = this.refreshNNRoundRobinIndex + 1;
            this.refreshNNRoundRobinIndex = i2;
            this.refreshNNRoundRobinIndex = i2 % this.nnList.size();
            if (!this.blackListNN.contains(this.nnList.get(this.refreshNNRoundRobinIndex).getRpcServerAddressForDatanodes())) {
                return;
            }
        }
    }

    static /* synthetic */ int access$610(BPOfferService bPOfferService) {
        int i = bPOfferService.incrementalBRCounter;
        bPOfferService.incrementalBRCounter = i - 1;
        return i;
    }

    static {
        $assertionsDisabled = !BPOfferService.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
    }
}
