/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import io.hops.leader_election.node.ActiveNode;
import io.hops.leader_election.node.SortedActiveNodeList;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.HdfsVariables;
import io.hops.metadata.hdfs.dal.ActiveBlockReportsDataAccess;
import io.hops.metadata.hdfs.entity.ActiveBlockReport;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.LightWeightRequestHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.blockmanagement.BRLoadBalancingOverloadException;

public class BRTrackingService {
    public static final Log LOG = LogFactory.getLog(BRTrackingService.class);
    private final long DB_VAR_UPDATE_THRESHOLD;
    private final long BR_MAX_PROCESSING_TIME;
    private int rrIndex = 0;
    private long lastChecked = 0L;
    private long cachedMaxConcurrentBRPerNN = 0L;

    public BRTrackingService(long DB_VAR_UPDATE_THRESHOLD, long MAX_CONCURRENT_BRS, long BR_MAX_PROCESSING_TIME) {
        this.DB_VAR_UPDATE_THRESHOLD = DB_VAR_UPDATE_THRESHOLD;
        this.BR_MAX_PROCESSING_TIME = BR_MAX_PROCESSING_TIME;
        this.cachedMaxConcurrentBRPerNN = MAX_CONCURRENT_BRS;
    }

    private int getRRIndex(SortedActiveNodeList nnList) {
        if (this.rrIndex < 0 || this.rrIndex >= nnList.size()) {
            this.rrIndex = 0;
        }
        return this.rrIndex++ % nnList.size();
    }

    private boolean canProcessMoreBR(SortedActiveNodeList nnList, List<ActiveBlockReport> retActiveBRs) throws IOException {
        List<ActiveBlockReport> allActiveBRs = this.getAllActiveBlockReports();
        Iterator<ActiveBlockReport> itr = allActiveBRs.iterator();
        while (itr.hasNext()) {
            ActiveBlockReport abr = itr.next();
            if (System.currentTimeMillis() - abr.getStartTime() <= this.BR_MAX_PROCESSING_TIME) continue;
            LOG.warn((Object)("block report timed out dn: " + abr.getDnAddress() + " on NN: " + abr.getNnId()));
            this.removeActiveBlockReport(abr);
            itr.remove();
        }
        retActiveBRs.addAll(allActiveBRs);
        return (long)allActiveBRs.size() < this.getBrLbMaxConcurrentBRs(nnList);
    }

    private ActiveNode getLeastLoadedNode(SortedActiveNodeList nnList, List<ActiveBlockReport> retActiveBRs) throws IOException {
        class Tuple {
            ActiveNode an;
            Integer counter;

            Tuple(ActiveNode an, Integer count) {
                this.an = an;
                this.counter = count;
            }
        }
        HashMap<String, Tuple> usage = new HashMap<String, Tuple>();
        for (ActiveNode an : nnList.getActiveNodes()) {
            usage.put(an.getHttpAddress(), new Tuple(an, new Integer(0)));
        }
        for (ActiveBlockReport abr : retActiveBRs) {
            Tuple entry = (Tuple)usage.get(abr.getNnAddress());
            if (entry == null) continue;
            Tuple tuple = entry;
            Integer.valueOf(tuple.counter + 1);
            tuple.counter = tuple.counter;
        }
        ActiveNode leastLoaded = null;
        int count = Integer.MAX_VALUE;
        for (String key : usage.keySet()) {
            Tuple tuple = (Tuple)usage.get(key);
            if (tuple.counter >= count) continue;
            leastLoaded = tuple.an;
            count = tuple.counter;
        }
        if ((long)count >= this.cachedMaxConcurrentBRPerNN) {
            return null;
        }
        return leastLoaded;
    }

    private long getBrLbMaxConcurrentBRs(SortedActiveNodeList nnList) throws IOException {
        if (System.currentTimeMillis() - this.lastChecked > this.DB_VAR_UPDATE_THRESHOLD) {
            long value = HdfsVariables.getMaxConcurrentBrs();
            if (value != this.cachedMaxConcurrentBRPerNN) {
                this.cachedMaxConcurrentBRPerNN = value;
                LOG.info((Object)("BRTrackingService param update. Processing " + this.cachedMaxConcurrentBRPerNN + " concurrent block reports"));
            }
            this.lastChecked = System.currentTimeMillis();
        }
        return this.cachedMaxConcurrentBRPerNN * (long)nnList.size();
    }

    public synchronized ActiveNode assignWork(final SortedActiveNodeList nnList, final String dnAddress, final long noOfBlks) throws IOException {
        return (ActiveNode)new LightWeightRequestHandler(HDFSOperationType.BR_LB_GET_ALL){

            public Object performTask() throws IOException {
                boolean isActive = connector.isTransactionActive();
                if (!isActive) {
                    connector.beginTransaction();
                    connector.writeLock();
                }
                try {
                    ActiveNode an;
                    ArrayList retActiveBRs = new ArrayList();
                    if (BRTrackingService.this.canProcessMoreBR(nnList, retActiveBRs) && (an = BRTrackingService.this.getLeastLoadedNode(nnList, retActiveBRs)) != null) {
                        ActiveBlockReport abr = new ActiveBlockReport(dnAddress, an.getId(), an.getHttpAddress(), System.currentTimeMillis(), noOfBlks);
                        BRTrackingService.this.addActiveBlockReport(abr);
                        LOG.info((Object)("Block report from " + dnAddress + " containing " + noOfBlks + " blocks is assigned to NN [ID: " + an.getId() + ", IP: " + an.getRpcServerIpAddress() + "]"));
                        ActiveNode activeNode = an;
                        return activeNode;
                    }
                    String msg = "Work (" + noOfBlks + " blks) could not be assigned. System is fully loaded now. At most " + BRTrackingService.this.getBrLbMaxConcurrentBRs(nnList) + " concurrent block reports can be processed.";
                    LOG.info((Object)msg);
                    throw new BRLoadBalancingOverloadException(msg);
                }
                finally {
                    if (!isActive) {
                        connector.commit();
                    }
                }
            }
        }.handle();
    }

    public synchronized void blockReportCompleted(String dnAddress) throws IOException {
        ActiveBlockReport abr = new ActiveBlockReport(dnAddress, 0L, "", 0L, 0L);
        LOG.info((Object)("Block report from " + dnAddress + " has completed"));
        this.removeActiveBlockReport(abr);
    }

    private List<ActiveBlockReport> getAllActiveBlockReports() throws IOException {
        LightWeightRequestHandler handler = new LightWeightRequestHandler(HDFSOperationType.BR_LB_GET_ALL){

            public Object performTask() throws IOException {
                ActiveBlockReportsDataAccess da = (ActiveBlockReportsDataAccess)HdfsStorageFactory.getDataAccess(ActiveBlockReportsDataAccess.class);
                return da.getAll();
            }
        };
        return (List)handler.handle();
    }

    private void addActiveBlockReport(final ActiveBlockReport abr) throws IOException {
        LightWeightRequestHandler handler = new LightWeightRequestHandler(HDFSOperationType.BR_LB_ADD){

            public Object performTask() throws IOException {
                boolean isActive = connector.isTransactionActive();
                if (!isActive) {
                    connector.beginTransaction();
                    connector.writeLock();
                }
                ActiveBlockReportsDataAccess da = (ActiveBlockReportsDataAccess)HdfsStorageFactory.getDataAccess(ActiveBlockReportsDataAccess.class);
                da.addActiveReport(abr);
                if (!isActive) {
                    connector.commit();
                }
                return null;
            }
        };
        handler.handle();
    }

    private void removeActiveBlockReport(final ActiveBlockReport abr) throws IOException {
        LightWeightRequestHandler handler = new LightWeightRequestHandler(HDFSOperationType.BR_LB_REMOVE){

            public Object performTask() throws IOException {
                ActiveBlockReportsDataAccess da;
                ActiveBlockReport inDB;
                boolean isActive = connector.isTransactionActive();
                if (!isActive) {
                    connector.beginTransaction();
                    connector.writeLock();
                }
                if ((inDB = (da = (ActiveBlockReportsDataAccess)HdfsStorageFactory.getDataAccess(ActiveBlockReportsDataAccess.class)).getActiveBlockReport(abr)) != null) {
                    da.removeActiveReport(inDB);
                }
                if (!isActive) {
                    connector.commit();
                }
                return null;
            }
        };
        handler.handle();
    }
}

