/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import io.hops.common.IDsGeneratorFactory;
import io.hops.exception.StorageException;
import io.hops.exception.TransactionContextException;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.common.FinderType;
import io.hops.metadata.hdfs.dal.QuotaUpdateDataAccess;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.metadata.hdfs.entity.QuotaUpdate;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.SubtreeLockHelper;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
import org.apache.hadoop.hdfs.server.namenode.QuotaUpdateException;
import org.apache.hadoop.util.Daemon;

public class QuotaUpdateManager {
    static final Log LOG = LogFactory.getLog(QuotaUpdateManager.class);
    private final FSNamesystem namesystem;
    private final int updateInterval;
    private final int updateLimit;
    private final Daemon updateThread = new Daemon((Runnable)new QuotaUpdateMonitor());
    private final ConcurrentLinkedQueue<Iterator<Long>> prioritizedUpdates = new ConcurrentLinkedQueue();
    private final Comparator<QuotaUpdate> quotaUpdateComparator = new Comparator<QuotaUpdate>(){

        @Override
        public int compare(QuotaUpdate quotaUpdate, QuotaUpdate quotaUpdate2) {
            if (quotaUpdate.getInodeId() < quotaUpdate2.getInodeId()) {
                return -1;
            }
            if (quotaUpdate.getInodeId() > quotaUpdate2.getInodeId()) {
                return 1;
            }
            return 0;
        }
    };

    public QuotaUpdateManager(FSNamesystem namesystem, Configuration conf) {
        this.namesystem = namesystem;
        this.updateInterval = conf.getInt("dfs.namenode.quota.update.interval", 1000);
        this.updateLimit = conf.getInt("dfs.namenode.quota.update.limit", 5000);
    }

    public void activate() {
        LOG.debug((Object)"QuotaUpdateMonitor is running");
        this.updateThread.start();
    }

    public void close() {
        if (this.updateThread != null) {
            this.updateThread.interrupt();
            try {
                this.updateThread.join(3000L);
            }
            catch (InterruptedException e) {
                LOG.error((Object)"QuotaUpdateManager Thread Interrupted");
                Thread.currentThread().interrupt();
            }
        }
    }

    private int nextId() throws StorageException {
        return IDsGeneratorFactory.getInstance().getUniqueQuotaUpdateID();
    }

    public void addUpdate(long inodeId, QuotaCounts counts) throws StorageException, TransactionContextException {
        HashMap<QuotaUpdate.StorageType, Long> typeSpaces = new HashMap<QuotaUpdate.StorageType, Long>();
        for (StorageType t : StorageType.asList()) {
            typeSpaces.put(QuotaUpdate.StorageType.valueOf((String)t.name()), counts.getTypeSpace(t));
        }
        QuotaUpdate update = new QuotaUpdate(this.nextId(), inodeId, counts.getNameSpace(), counts.getStorageSpace(), typeSpaces);
        EntityManager.add((Object)update);
    }

    private void processUpdates(final Long id) throws IOException {
        LightWeightRequestHandler findHandler = new LightWeightRequestHandler(HDFSOperationType.GET_UPDATES_FOR_ID){

            public Object performTask() throws IOException {
                QuotaUpdateDataAccess dataAccess = (QuotaUpdateDataAccess)HdfsStorageFactory.getDataAccess(QuotaUpdateDataAccess.class);
                return dataAccess.findByInodeId(id.longValue());
            }
        };
        List quotaUpdates = (List)findHandler.handle();
        LOG.debug((Object)("processUpdates for inode id=" + id + " quotaUpdates ids are " + Arrays.toString(quotaUpdates.toArray())));
        this.applyBatchedUpdate(quotaUpdates);
    }

    private boolean processNextUpdateBatch() throws IOException {
        LightWeightRequestHandler findHandler = new LightWeightRequestHandler(HDFSOperationType.GET_NEXT_QUOTA_BATCH){

            public Object performTask() throws IOException {
                QuotaUpdateDataAccess dataAccess = (QuotaUpdateDataAccess)HdfsStorageFactory.getDataAccess(QuotaUpdateDataAccess.class);
                return dataAccess.findLimited(QuotaUpdateManager.this.updateLimit);
            }
        };
        List quotaUpdates = (List)findHandler.handle();
        Collections.sort(quotaUpdates, this.quotaUpdateComparator);
        boolean rerunImmediatly = false;
        ArrayList<Object> batch = new ArrayList<QuotaUpdate>();
        for (QuotaUpdate update : quotaUpdates) {
            if (batch.size() == 0 || batch.get(0).getInodeId() == update.getInodeId()) {
                batch.add(update);
                continue;
            }
            rerunImmediatly = rerunImmediatly || this.applyBatchedUpdate(batch);
            batch = new ArrayList();
            batch.add(update);
        }
        if (batch.size() != 0) {
            boolean bl = rerunImmediatly = rerunImmediatly || this.applyBatchedUpdate(batch);
        }
        if (quotaUpdates.size() == this.updateLimit) {
            rerunImmediatly = true;
        }
        return rerunImmediatly;
    }

    private boolean applyBatchedUpdate(final List<QuotaUpdate> updates) throws IOException {
        if (updates.size() == 0) {
            return false;
        }
        return (Boolean)new HopsTransactionalRequestHandler(HDFSOperationType.APPLY_QUOTA_UPDATE){
            INodeIdentifier iNodeIdentifier;

            @Override
            public void setUp() throws IOException {
                super.setUp();
                this.iNodeIdentifier = new INodeIdentifier(Long.valueOf(((QuotaUpdate)updates.get(0)).getInodeId()));
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.iNodeIdentifier)).add(lf.getQuotaUpdateLock(updates));
            }

            public Object performTask() throws IOException {
                DirectoryWithQuotaFeature q;
                INodeDirectory dir = (INodeDirectory)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{((QuotaUpdate)updates.get(0)).getInodeId()});
                Collection dbUpdates = EntityManager.findList((FinderType)QuotaUpdate.Finder.ByINodeId, (Object[])new Object[]{((QuotaUpdate)updates.get(0)).getInodeId()});
                if (dir != null && SubtreeLockHelper.isSTOLocked(dir.isSTOLocked(), dir.getSTOLockOwner(), QuotaUpdateManager.this.namesystem.getNameNode().getActiveNameNodes().getActiveNodes()) && dir.getSTOLockOwner() != QuotaUpdateManager.this.namesystem.getNamenodeId()) {
                    LOG.warn((Object)"Ignoring updates as the subtree lock is set");
                    return false;
                }
                QuotaCounts counts = new QuotaCounts.Builder().build();
                for (QuotaUpdate update : dbUpdates) {
                    counts.addStorageSpace(update.getStorageSpaceDelta());
                    counts.addNameSpace(update.getNamespaceDelta());
                    for (Map.Entry entry : update.getTypeSpaces().entrySet()) {
                        counts.addTypeSpace(StorageType.valueOf((String)((QuotaUpdate.StorageType)entry.getKey()).name()), (Long)entry.getValue());
                    }
                    LOG.debug((Object)("handling " + update));
                    EntityManager.remove((Object)update);
                }
                if (dir == null) {
                    LOG.debug((Object)("dropping update for " + updates.get(0) + " quota " + counts + " because of deletion"));
                    return false;
                }
                if (dir != null && dir.isQuotaSet() && (q = dir.getDirectoryWithQuotaFeature()) != null) {
                    q.addSpaceConsumed2Cache(counts);
                    LOG.debug((Object)("applying aggregated update for directory " + dir.getId() + " with quota " + counts));
                }
                boolean hasParentUpdate = false;
                if (dir != null && dir.getId() != 1L) {
                    boolean allNull = counts.getStorageSpace() == 0L && counts.getNameSpace() == 0L;
                    HashMap<QuotaUpdate.StorageType, Long> typeSpace = new HashMap<QuotaUpdate.StorageType, Long>();
                    for (StorageType type : StorageType.asList()) {
                        typeSpace.put(QuotaUpdate.StorageType.valueOf((String)type.name()), counts.getTypeSpace(type));
                        allNull = allNull && counts.getTypeSpace(type) == 0L;
                    }
                    if (!allNull) {
                        QuotaUpdate quotaUpdate = new QuotaUpdate(QuotaUpdateManager.this.nextId(), dir.getParentId(), counts.getNameSpace(), counts.getStorageSpace(), typeSpace);
                        EntityManager.add((Object)quotaUpdate);
                        hasParentUpdate = true;
                        LOG.debug((Object)("adding parent update " + quotaUpdate));
                    }
                }
                return hasParentUpdate;
            }
        }.handle(this);
    }

    void addPrioritizedUpdates(Iterator<Long> iterator) throws QuotaUpdateException {
        if (!this.namesystem.isLeader()) {
            throw new QuotaUpdateException("Non leader namenode cannot prioritize quota updates for inodes");
        }
        this.prioritizedUpdates.add(iterator);
    }

    private class QuotaUpdateMonitor
    implements Runnable {
        private QuotaUpdateMonitor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (QuotaUpdateManager.this.namesystem.isRunning()) {
                long startTime = System.currentTimeMillis();
                try {
                    long sleepDuration;
                    boolean rerunImmediatly = false;
                    if (QuotaUpdateManager.this.namesystem.isLeader()) {
                        if (!QuotaUpdateManager.this.prioritizedUpdates.isEmpty()) {
                            Iterator iterator = (Iterator)QuotaUpdateManager.this.prioritizedUpdates.poll();
                            while (iterator.hasNext()) {
                                QuotaUpdateManager.this.processUpdates((Long)iterator.next());
                            }
                            Iterator iterator2 = iterator;
                            synchronized (iterator2) {
                                iterator.notify();
                            }
                        }
                        rerunImmediatly = QuotaUpdateManager.this.processNextUpdateBatch();
                    }
                    if (rerunImmediatly || (sleepDuration = (long)QuotaUpdateManager.this.updateInterval - (System.currentTimeMillis() - startTime)) <= 0L) continue;
                    Thread.sleep(QuotaUpdateManager.this.updateInterval);
                }
                catch (InterruptedException ie) {
                    LOG.warn((Object)"QuotaUpdateMonitor thread received InterruptedException.", (Throwable)ie);
                    break;
                }
                catch (StorageException e) {
                    LOG.warn((Object)"QuotaUpdateMonitor thread received StorageException.", (Throwable)e);
                }
                catch (Throwable t) {
                    LOG.error((Object)"QuotaUpdateMonitor thread received Runtime exception. ", t);
                }
            }
        }
    }
}

