/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import com.google.common.base.Preconditions;
import io.hops.exception.StorageException;
import io.hops.exception.TransactionContextException;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.HdfsVariables;
import io.hops.metadata.common.FinderType;
import io.hops.metadata.hdfs.dal.CacheDirectiveDataAccess;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.lock.INodeLock;
import io.hops.transaction.lock.Lock;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate(value={"HDFS"})
public class CacheReplicationMonitor
extends Thread
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(CacheReplicationMonitor.class);
    private final FSNamesystem namesystem;
    private final BlockManager blockManager;
    private final CacheManager cacheManager;
    private static final Random random = new Random();
    private final long intervalMs;
    private final ReentrantLock lock;
    private final Condition doRescan;
    private final Condition scanFinished;
    private boolean shutdown = false;
    private boolean mark = false;
    private int scannedDirectives;
    private long scannedBlocks;

    public CacheReplicationMonitor(FSNamesystem namesystem, CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
        this.namesystem = namesystem;
        this.blockManager = namesystem.getBlockManager();
        this.cacheManager = cacheManager;
        this.intervalMs = intervalMs;
        this.lock = lock;
        this.doRescan = this.lock.newCondition();
        this.scanFinished = this.lock.newCondition();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        long startTimeMs = 0L;
        Thread.currentThread().setName("CacheReplicationMonitor(" + System.identityHashCode(this) + ")");
        LOG.info("Starting CacheReplicationMonitor with interval " + this.intervalMs + " milliseconds");
        try {
            long curTimeMs = Time.monotonicNow();
            while (true) {
                block19: {
                    this.lock.lock();
                    try {
                        while (true) {
                            long delta;
                            if (this.namesystem.isLeader()) {
                                if (this.shutdown) {
                                    LOG.debug("Shutting down CacheReplicationMonitor");
                                    return;
                                }
                                if (HdfsVariables.getNeedRescan()) {
                                    LOG.debug("Rescanning because of pending operations");
                                    break block19;
                                }
                            }
                            if ((delta = startTimeMs + this.intervalMs - curTimeMs) <= 0L) {
                                if (this.namesystem.isLeader()) {
                                    LOG.debug("Rescanning after " + (curTimeMs - startTimeMs) + " milliseconds");
                                    break block19;
                                }
                                startTimeMs = curTimeMs;
                                delta = startTimeMs + this.intervalMs - curTimeMs;
                            }
                            this.doRescan.await(delta, TimeUnit.MILLISECONDS);
                            curTimeMs = Time.monotonicNow();
                        }
                    }
                    catch (StorageException ignore) {}
                    continue;
                    finally {
                        this.lock.unlock();
                        continue;
                    }
                }
                startTimeMs = curTimeMs;
                this.mark = !this.mark;
                this.rescan();
                curTimeMs = Time.monotonicNow();
                this.lock.lock();
                try {
                    HdfsVariables.setCompletedAndCurScanCount();
                    this.scanFinished.signalAll();
                }
                finally {
                    this.lock.unlock();
                }
                LOG.debug("Scanned " + this.scannedDirectives + " directive(s) and " + this.scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " millisecond(s).");
            }
        }
        catch (InterruptedException e) {
            LOG.info("Shutting down CacheReplicationMonitor.");
            return;
        }
        catch (Throwable t) {
            LOG.error("Thread exiting", t);
            ExitUtil.terminate(1, t);
            return;
        }
    }

    public void waitForRescanIfNeeded() throws StorageException, TransactionContextException, IOException {
        Preconditions.checkArgument((boolean)this.lock.isHeldByCurrentThread(), (Object)"Must hold the CRM lock when waiting for a rescan.");
        if (!HdfsVariables.getNeedRescan()) {
            return;
        }
        if (!this.namesystem.isLeader()) {
            throw new RuntimeException("Asked non leading node to rescan cache");
        }
        if (HdfsVariables.getCurScanCount() < 0) {
            this.doRescan.signal();
        }
        while (!this.shutdown && HdfsVariables.getNeedRescan()) {
            try {
                this.scanFinished.await();
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for CacheReplicationMonitor rescan", (Throwable)e);
                break;
            }
        }
    }

    public void setNeedsRescan() throws StorageException, TransactionContextException, IOException {
        Preconditions.checkArgument((boolean)this.lock.isHeldByCurrentThread(), (Object)"Must hold the CRM lock when setting the needsRescan bit.");
        HdfsVariables.setNeedRescan();
    }

    @Override
    public void close() throws IOException {
        this.lock.lock();
        try {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            this.doRescan.signalAll();
            this.scanFinished.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    private void rescan() throws InterruptedException, StorageException, TransactionContextException, IOException {
        this.scannedDirectives = 0;
        this.scannedBlocks = 0L;
        this.lock.lock();
        try {
            if (this.shutdown) {
                throw new InterruptedException("CacheReplicationMonitor was shut down.");
            }
            HdfsVariables.setCurScanCount();
        }
        finally {
            this.lock.unlock();
        }
        new HopsTransactionalRequestHandler(HDFSOperationType.LIST_CACHE_DIRECTIVE){

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getCachePoolLock(TransactionLockTypes.LockType.WRITE));
            }

            public Object performTask() throws IOException {
                CacheReplicationMonitor.this.resetStatistics();
                return null;
            }
        }.handle();
        this.rescanCacheDirectives();
        this.rescanCachedBlockMap();
        this.blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
    }

    private void resetStatistics() throws TransactionContextException, StorageException {
        for (CachePool pool : this.cacheManager.getCachePools()) {
            pool.resetStatistics();
        }
    }

    private void rescanCacheDirectives() throws StorageException, TransactionContextException, IOException {
        final long now = new Date().getTime();
        Collection<CacheDirective> directives = this.getCacheDirectives();
        for (final CacheDirective tmpDirective : directives) {
            ++this.scannedDirectives;
            new HopsTransactionalRequestHandler(HDFSOperationType.RESCAN_CACHE_DIRECTIVE){

                public void acquireLock(TransactionLocks locks) throws IOException {
                    LockFactory lf = LockFactory.getInstance();
                    locks.add(lf.getCacheDirectiveLock(tmpDirective.getId())).add(lf.getCachePoolLock(tmpDirective.getPoolName()));
                    INodeLock il = lf.getINodeLock(TransactionLockTypes.INodeLockType.READ, TransactionLockTypes.INodeResolveType.PATH_AND_IMMEDIATE_CHILDREN, tmpDirective.getPath()).setNameNodeID(CacheReplicationMonitor.this.namesystem.getNamenodeId()).setActiveNameNodes(CacheReplicationMonitor.this.namesystem.getNameNode().getActiveNameNodes().getActiveNodes());
                    locks.add((Lock)il).add(lf.getBlockLock()).add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.UC, LockFactory.BLK.CA));
                }

                public Object performTask() throws IOException {
                    CacheDirective directive = (CacheDirective)EntityManager.find((FinderType)CacheDirective.Finder.ById, (Object[])new Object[]{tmpDirective.getId()});
                    if (directive == null) {
                        return null;
                    }
                    directive.resetStatistics();
                    if (directive.getExpiryTime() > 0L && directive.getExpiryTime() <= now) {
                        LOG.debug("Directive {}: the directive expired at {} (now = {})", new Object[]{directive.getId(), directive.getExpiryTime(), now});
                        return null;
                    }
                    String path = directive.getPath();
                    INode node = CacheReplicationMonitor.this.namesystem.getINode(directive.getPath());
                    if (node == null) {
                        LOG.debug("Directive {}: No inode found at {}", (Object)directive.getId(), (Object)path);
                    } else {
                        if (node.isSymlink()) {
                            LOG.debug("Directive {}: got UnresolvedLinkException while resolving path {}", (Object)directive.getId(), (Object)path);
                            return null;
                        }
                        if (node.isDirectory()) {
                            INodeDirectory dir = node.asDirectory();
                            List<INode> children = dir.getChildrenList();
                            for (INode child : children) {
                                if (!child.isFile()) continue;
                                CacheReplicationMonitor.this.rescanFile(directive, child.asFile());
                            }
                        } else if (node.isFile()) {
                            CacheReplicationMonitor.this.rescanFile(directive, node.asFile());
                        } else {
                            LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ", (Object)directive.getId(), (Object)node);
                        }
                    }
                    return null;
                }
            }.handle();
        }
    }

    private Collection<CacheDirective> getCacheDirectives() throws IOException {
        LightWeightRequestHandler handler = new LightWeightRequestHandler(HDFSOperationType.GET_INODE){

            public Object performTask() throws IOException {
                CacheDirectiveDataAccess da = (CacheDirectiveDataAccess)HdfsStorageFactory.getDataAccess(CacheDirectiveDataAccess.class);
                return da.findAll();
            }
        };
        return (Collection)handler.handle();
    }

    private void rescanFile(CacheDirective directive, INodeFile file) throws StorageException, TransactionContextException {
        BlockInfoContiguous[] blockInfos = file.getBlocks();
        directive.addFilesNeeded(1L);
        long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() * (long)directive.getReplication();
        directive.addBytesNeeded(neededTotal);
        CachePool pool = directive.getPool();
        if (pool.getBytesNeeded() > pool.getLimit()) {
            LOG.debug("Directive {}: not scanning file {} because bytesNeeded for pool {} is {}, but the pool's limit is {}", new Object[]{directive.getId(), file.getFullPathName(), pool.getPoolName(), pool.getBytesNeeded(), pool.getLimit()});
            return;
        }
        long cachedTotal = 0L;
        for (BlockInfoContiguous blockInfo : blockInfos) {
            if (!blockInfo.getBlockUCState().equals((Object)HdfsServerConstants.BlockUCState.COMPLETE)) {
                LOG.trace("Directive {}: can't cache block {} because it is in state {}, not COMPLETE.", new Object[]{directive.getId(), blockInfo, blockInfo.getBlockUCState()});
                continue;
            }
            Block block = new Block(blockInfo.getBlockId());
            CachedBlock ncblock = new CachedBlock(block.getBlockId(), blockInfo.getInodeId(), directive.getReplication(), this.mark);
            CachedBlock ocblock = this.cacheManager.getCachedBlock(ncblock);
            if (ocblock == null) {
                ocblock = ncblock;
                ocblock.save();
            } else {
                List<DatanodeDescriptor> cachedOn = ocblock.getDatanodes(CachedBlock.Type.CACHED);
                long cachedByBlock = (long)Math.min(cachedOn.size(), directive.getReplication()) * blockInfo.getNumBytes();
                cachedTotal += cachedByBlock;
                if (this.mark != ocblock.getMark() || ocblock.getReplication() < directive.getReplication()) {
                    ocblock.setReplicationAndMark(directive.getReplication(), this.mark);
                    ocblock.save();
                }
            }
            LOG.trace("Directive {}: setting replication for block {} to {}", new Object[]{directive.getId(), blockInfo, ocblock.getReplication()});
        }
        directive.addBytesCached(cachedTotal);
        if (cachedTotal == neededTotal) {
            directive.addFilesCached(1L);
        }
        LOG.debug("Directive {}: caching {}: {}/{} bytes", new Object[]{directive.getId(), file.getFullPathName(), cachedTotal, neededTotal});
    }

    private String findReasonForNotCaching(CachedBlock cblock, BlockInfoContiguous blockInfo) throws TransactionContextException, StorageException {
        if (blockInfo == null) {
            return "not tracked by the BlockManager";
        }
        if (!blockInfo.isComplete()) {
            return "not complete";
        }
        if (cblock.getReplication() == 0) {
            return "not needed by any directives";
        }
        if (cblock.getMark() != this.mark) {
            cblock.setReplicationAndMark((short)0, this.mark);
            cblock.save();
            return "no longer needed by any directives";
        }
        return null;
    }

    private void rescanCachedBlockMap() throws StorageException, TransactionContextException, IOException {
        new HopsTransactionalRequestHandler(HDFSOperationType.RESCAN_BLOCK_MAP){

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getAllCachedBlockLocks());
                locks.add(lf.getBlockLock());
                locks.add(lf.getBlockRelated(LockFactory.BLK.RE, LockFactory.BLK.CR));
            }

            public Object performTask() throws IOException {
                Collection<CachedBlock> cachedBlocks = CachedBlock.getAll(CacheReplicationMonitor.this.blockManager.getDatanodeManager());
                for (CachedBlock cblock : cachedBlocks) {
                    int neededUncached;
                    DatanodeDescriptor datanode;
                    Iterator<DatanodeDescriptor> iter;
                    CacheReplicationMonitor.this.scannedBlocks++;
                    List<DatanodeDescriptor> pendingCached = cblock.getDatanodes(CachedBlock.Type.PENDING_CACHED);
                    List<DatanodeDescriptor> cached = cblock.getDatanodes(CachedBlock.Type.CACHED);
                    List<DatanodeDescriptor> pendingUncached = cblock.getDatanodes(CachedBlock.Type.PENDING_UNCACHED);
                    BlockInfoContiguous blockInfo = null;
                    if (cblock.getInodeId() > 0L) {
                        blockInfo = CacheReplicationMonitor.this.blockManager.getStoredBlock(new Block(cblock.getBlockId()));
                    }
                    String reason = CacheReplicationMonitor.this.findReasonForNotCaching(cblock, blockInfo);
                    short neededCached = 0;
                    if (reason != null) {
                        LOG.trace("Block {}: can't cache block because it is {}", (Object)cblock.getBlockId(), (Object)reason);
                    } else {
                        neededCached = cblock.getReplication();
                    }
                    int numCached = cached.size();
                    if (numCached >= neededCached) {
                        iter = pendingCached.iterator();
                        while (iter.hasNext()) {
                            datanode = iter.next();
                            cblock.removePending(datanode);
                            iter.remove();
                            LOG.trace("Block {}: removing from PENDING_CACHED for node {}because we already have {} cached replicas and we only need {}", new Object[]{cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, (int)neededCached});
                        }
                    }
                    if (numCached < neededCached) {
                        iter = pendingUncached.iterator();
                        while (iter.hasNext()) {
                            datanode = iter.next();
                            cblock.switchPendingUncachedToCached(datanode);
                            iter.remove();
                            LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} because we only have {} cached replicas and we need {}", new Object[]{cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, (int)neededCached});
                        }
                    }
                    if ((neededUncached = numCached - (pendingUncached.size() + neededCached)) > 0) {
                        CacheReplicationMonitor.this.addNewPendingUncached(neededUncached, cblock, cached, pendingUncached);
                    } else {
                        int additionalCachedNeeded = neededCached - (numCached + pendingCached.size());
                        if (additionalCachedNeeded > 0) {
                            CacheReplicationMonitor.this.addNewPendingCached(additionalCachedNeeded, cblock, cached, pendingCached);
                        }
                    }
                    if (neededCached != 0 || !pendingUncached.isEmpty() || !pendingCached.isEmpty()) continue;
                    LOG.trace("Block {}: removing from cachedBlocks, since neededCached == 0, and pendingUncached and pendingCached are empty.", (Object)cblock.getBlockId());
                    cblock.remove();
                }
                return null;
            }
        }.handle();
    }

    private void addNewPendingUncached(int neededUncached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingUncached) throws TransactionContextException, StorageException {
        LinkedList<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
        for (DatanodeDescriptor datanode : cached) {
            if (pendingUncached.contains(datanode)) continue;
            possibilities.add(datanode);
        }
        while (neededUncached > 0) {
            if (possibilities.isEmpty()) {
                LOG.warn("Logic error: we're trying to uncache more replicas than actually exist for " + cachedBlock);
                return;
            }
            DatanodeDescriptor datanode = (DatanodeDescriptor)possibilities.remove(random.nextInt(possibilities.size()));
            pendingUncached.add(datanode);
            boolean added = cachedBlock.setPendingUncached(datanode);
            assert (added);
            --neededUncached;
        }
    }

    private void addNewPendingCached(int neededCached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingCached) throws StorageException, TransactionContextException {
        BlockInfoContiguous blockInfo = this.blockManager.getStoredBlock(new Block(cachedBlock.getBlockId()));
        if (blockInfo == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Block {}: can't add new cached replicas, because there is no record of this block on the NameNode.", (Object)cachedBlock.getBlockId());
            }
            return;
        }
        if (!blockInfo.isComplete()) {
            LOG.debug("Block {}: can't cache this block, because it is not yet complete.", (Object)cachedBlock.getBlockId());
            return;
        }
        LinkedList<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
        DatanodeStorageInfo[] storages = blockInfo.getStorages(this.blockManager.getDatanodeManager());
        int numReplicas = storages.length;
        Collection<DatanodeDescriptor> corrupt = this.blockManager.getCorruptReplicas(blockInfo);
        int outOfCapacity = 0;
        for (int i = 0; i < numReplicas; ++i) {
            BlockInfoContiguous info;
            DatanodeDescriptor datanode = storages[i].getDatanodeDescriptor();
            if (datanode == null || datanode.isDecommissioned() || datanode.isDecommissionInProgress() || corrupt != null && corrupt.contains(datanode) || pendingCached.contains(datanode) || cached.contains(datanode)) continue;
            long pendingBytes = 0L;
            for (CachedBlock cBlock : datanode.getPendingCached(this.blockManager.getDatanodeManager())) {
                info = this.blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
                if (info == null) continue;
                pendingBytes -= info.getNumBytes();
            }
            for (CachedBlock cBlock : datanode.getPendingUncached(this.blockManager.getDatanodeManager())) {
                info = this.blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
                if (info == null) continue;
                pendingBytes += info.getNumBytes();
            }
            long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
            if (pendingCapacity < blockInfo.getNumBytes()) {
                LOG.trace("Block {}: DataNode {} is not a valid possibility because the block has size {}, but the DataNode only has {}bytes of cache remaining ({} pending bytes, {} already cached.", new Object[]{blockInfo.getBlockId(), datanode.getDatanodeUuid(), blockInfo.getNumBytes(), pendingCapacity, pendingBytes, datanode.getCacheRemaining()});
                ++outOfCapacity;
                continue;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Datanode " + datanode.getDatanodeUuid() + " is a valid possibility for block " + blockInfo.getBlockId() + " of size " + blockInfo.getNumBytes() + " bytes, has " + datanode.getCacheRemaining() + " bytes of cache remaining.");
            }
            possibilities.add(datanode);
        }
        List<DatanodeDescriptor> chosen = CacheReplicationMonitor.chooseDatanodesForCaching(possibilities, neededCached, this.blockManager.getDatanodeManager().getStaleInterval(), blockInfo.getBlockId());
        for (DatanodeDescriptor datanode : chosen) {
            LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}", (Object)blockInfo.getBlockId(), (Object)datanode.getDatanodeUuid());
            pendingCached.add(datanode);
            boolean added = cachedBlock.addPendingCached(datanode);
            assert (added);
        }
        if (neededCached > chosen.size()) {
            LOG.debug("Block {}: we only have {} of {} cached replicas. {} DataNodes have insufficient cache capacity.", new Object[]{blockInfo.getBlockId(), cachedBlock.getReplication() - neededCached + chosen.size(), cachedBlock.getReplication(), outOfCapacity});
        }
    }

    private static List<DatanodeDescriptor> chooseDatanodesForCaching(List<DatanodeDescriptor> possibilities, int neededCached, long staleInterval, long blockId) {
        AbstractList targets = new ArrayList<DatanodeDescriptor>(possibilities);
        LinkedList<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>();
        LinkedList<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>();
        Iterator it = targets.iterator();
        while (it.hasNext()) {
            DatanodeDescriptor d = (DatanodeDescriptor)it.next();
            if (!d.isStale(staleInterval)) continue;
            it.remove();
            stale.add(d);
        }
        while (chosen.size() < neededCached) {
            if (targets.isEmpty()) {
                if (stale.isEmpty()) break;
                targets = stale;
            }
            DatanodeDescriptor target = CacheReplicationMonitor.chooseRandomDatanodeByRemainingCapacity(targets);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Datanode " + target.getDatanodeUuid() + " is a chosen for block " + blockId + " of size " + neededCached + " bytes, has " + target.getCacheRemaining() + " bytes of cache remaining.");
            }
            chosen.add(target);
            targets.remove(target);
        }
        return chosen;
    }

    private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(List<DatanodeDescriptor> targets) {
        float total = 0.0f;
        for (DatanodeDescriptor d : targets) {
            total += d.getCacheRemainingPercent();
        }
        TreeMap<Integer, DatanodeDescriptor> lottery = new TreeMap<Integer, DatanodeDescriptor>();
        int offset = 0;
        for (DatanodeDescriptor d : targets) {
            int weight = Math.max(1, (int)(d.getCacheRemainingPercent() / total * 1000000.0f));
            lottery.put(offset += weight, d);
        }
        DatanodeDescriptor winner = (DatanodeDescriptor)lottery.higherEntry(random.nextInt(offset)).getValue();
        return winner;
    }
}

