/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import com.google.common.annotations.VisibleForTesting;
import io.hops.common.IDsGeneratorFactory;
import io.hops.exception.StorageException;
import io.hops.exception.TransactionContextException;
import io.hops.metadata.HdfsStorageFactory;
import io.hops.metadata.common.FinderType;
import io.hops.metadata.hdfs.dal.QuotaUpdateDataAccess;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.metadata.hdfs.entity.QuotaUpdate;
import io.hops.transaction.EntityManager;
import io.hops.transaction.handler.HDFSOperationType;
import io.hops.transaction.handler.HopsTransactionalRequestHandler;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.lock.LockFactory;
import io.hops.transaction.lock.SubtreeLockHelper;
import io.hops.transaction.lock.TransactionLockTypes;
import io.hops.transaction.lock.TransactionLocks;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.QuotaCounts;
import org.apache.hadoop.hdfs.server.namenode.QuotaUpdateException;
import org.apache.hadoop.util.Daemon;

public class QuotaUpdateManager {
    static final Log LOG = LogFactory.getLog(QuotaUpdateManager.class);
    private final FSNamesystem namesystem;
    private final int updateInterval;
    private final int updateLimit;
    private final Daemon updateThread = new Daemon(new QuotaUpdateMonitor());
    public boolean pauseAsyncOps = false;
    private final ConcurrentLinkedQueue<Iterator<Long>> prioritizedUpdates = new ConcurrentLinkedQueue();

    public QuotaUpdateManager(FSNamesystem namesystem, Configuration conf) {
        this.namesystem = namesystem;
        this.updateInterval = conf.getInt("dfs.namenode.quota.update.interval", 1000);
        this.updateLimit = conf.getInt("dfs.namenode.quota.update.limit", 5000);
    }

    public void activate() {
        LOG.debug((Object)"QuotaUpdateMonitor is running");
        this.updateThread.start();
    }

    public void close() {
        if (this.updateThread != null) {
            this.updateThread.interrupt();
            try {
                this.updateThread.join(3000L);
            }
            catch (InterruptedException e) {
                LOG.error((Object)"QuotaUpdateManager Thread Interrupted");
                Thread.currentThread().interrupt();
            }
        }
    }

    private int nextId() throws StorageException {
        return IDsGeneratorFactory.getInstance().getUniqueQuotaUpdateID();
    }

    public void addUpdate(long inodeId, QuotaCounts counts) throws StorageException, TransactionContextException {
        HashMap<QuotaUpdate.StorageType, Long> typeSpaces = new HashMap<QuotaUpdate.StorageType, Long>();
        for (StorageType t : StorageType.asList()) {
            typeSpaces.put(QuotaUpdate.StorageType.valueOf((String)t.name()), counts.getTypeSpace(t));
        }
        QuotaUpdate update = new QuotaUpdate(this.nextId(), inodeId, counts.getNameSpace(), counts.getStorageSpace(), typeSpaces);
        EntityManager.add((Object)update);
    }

    private int countPendingQuota() throws IOException {
        LightWeightRequestHandler quotaApplicationChecker = new LightWeightRequestHandler(HDFSOperationType.COUNT_QUOTA_UPDATES){

            public Object performTask() throws StorageException, IOException {
                QuotaUpdateDataAccess da = (QuotaUpdateDataAccess)HdfsStorageFactory.getDataAccess(QuotaUpdateDataAccess.class);
                return da.getCount();
            }
        };
        return (Integer)quotaApplicationChecker.handle();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyAllPrioritizedUpdates() throws IOException {
        if (this.namesystem.isLeader() && !this.prioritizedUpdates.isEmpty()) {
            Iterator<Long> iterator = this.prioritizedUpdates.poll();
            while (iterator.hasNext()) {
                this.applyBatchedUpdateForINode(iterator.next(), true);
            }
            Iterator<Long> iterator2 = iterator;
            synchronized (iterator2) {
                iterator.notify();
            }
        }
    }

    private List<Long> getPendingInodes() throws IOException {
        return (List)new LightWeightRequestHandler(HDFSOperationType.GET_QUOTA_PENDING_INODES){

            public Object performTask() throws IOException {
                QuotaUpdateDataAccess dataAccess = (QuotaUpdateDataAccess)HdfsStorageFactory.getDataAccess(QuotaUpdateDataAccess.class);
                return dataAccess.getDistinctInodes();
            }
        }.handle();
    }

    private void processNextUpdateBatch() throws IOException {
        if (this.namesystem.isLeader()) {
            List<Long> pendingInodes = this.getPendingInodes();
            Collections.sort(pendingInodes, Collections.reverseOrder());
            for (Long inodeID : pendingInodes) {
                this.applyBatchedUpdateForINode(inodeID, false);
                if (this.prioritizedUpdates.size() <= 0) continue;
                break;
            }
        }
    }

    private void applyBatchedUpdateForINode(final Long inodeID, boolean thisIsPriorityWork) throws IOException {
        LOG.debug((Object)("Applying quota updates for INode ID: " + inodeID + " Priority: " + thisIsPriorityWork));
        HopsTransactionalRequestHandler handler = new HopsTransactionalRequestHandler(HDFSOperationType.APPLY_QUOTA_UPDATE){
            INodeIdentifier iNodeIdentifier;

            @Override
            public void setUp() throws IOException {
                super.setUp();
                this.iNodeIdentifier = new INodeIdentifier(inodeID);
            }

            public void acquireLock(TransactionLocks locks) throws IOException {
                LockFactory lf = LockFactory.getInstance();
                locks.add(lf.getIndividualINodeLock(TransactionLockTypes.INodeLockType.WRITE, this.iNodeIdentifier));
                locks.add(lf.getQuotaUpdateLock(inodeID, QuotaUpdateManager.this.updateLimit));
            }

            public Object performTask() throws IOException {
                DirectoryWithQuotaFeature q;
                Collection dbUpdates = EntityManager.findList((FinderType)QuotaUpdate.Finder.ByINodeId, (Object[])new Object[]{inodeID, QuotaUpdateManager.this.updateLimit});
                LOG.debug((Object)("Read " + dbUpdates.size() + " quota updates for INode ID: " + inodeID));
                if (dbUpdates.size() == 0) {
                    return 0;
                }
                INodeDirectory dir = (INodeDirectory)EntityManager.find((FinderType)INode.Finder.ByINodeIdFTIS, (Object[])new Object[]{inodeID});
                if (dir != null && SubtreeLockHelper.isSTOLocked(dir.isSTOLocked(), dir.getSTOLockOwner(), QuotaUpdateManager.this.namesystem.getNameNode().getActiveNameNodes().getActiveNodes()) && dir.getSTOLockOwner() != QuotaUpdateManager.this.namesystem.getNamenodeId()) {
                    LOG.warn((Object)"Ignoring updates as the subtree lock is set");
                    return false;
                }
                QuotaCounts counts = new QuotaCounts.Builder().build();
                for (QuotaUpdate update : dbUpdates) {
                    counts.addStorageSpace(update.getStorageSpaceDelta());
                    counts.addNameSpace(update.getNamespaceDelta());
                    for (Map.Entry entry : update.getTypeSpaces().entrySet()) {
                        counts.addTypeSpace(StorageType.valueOf(((QuotaUpdate.StorageType)entry.getKey()).name()), (Long)entry.getValue());
                    }
                    LOG.debug((Object)("handling " + update));
                    EntityManager.remove((Object)update);
                }
                if (dir == null) {
                    LOG.warn((Object)("Dropping update for INode ID: " + inodeID + " because the node has been deleted. Quota " + counts.toString()));
                    return dbUpdates.size();
                }
                if (dir != null && dir.isQuotaSet() && (q = dir.getDirectoryWithQuotaFeature()) != null) {
                    q.addSpaceConsumed2Cache(counts);
                    LOG.debug((Object)("applying aggregated update for directory " + dir.getId() + " with quota " + counts));
                }
                if (dir != null && dir.getId() != 1L) {
                    boolean allNull = counts.getStorageSpace() == 0L && counts.getNameSpace() == 0L;
                    HashMap<QuotaUpdate.StorageType, Long> typeSpace = new HashMap<QuotaUpdate.StorageType, Long>();
                    for (StorageType type : StorageType.asList()) {
                        typeSpace.put(QuotaUpdate.StorageType.valueOf((String)type.name()), counts.getTypeSpace(type));
                        allNull = allNull && counts.getTypeSpace(type) == 0L;
                    }
                    if (!allNull) {
                        QuotaUpdate parentUpdate = new QuotaUpdate(QuotaUpdateManager.this.nextId(), dir.getParentId(), counts.getNameSpace(), counts.getStorageSpace(), typeSpace);
                        EntityManager.add((Object)parentUpdate);
                        LOG.debug((Object)("adding parent update " + parentUpdate));
                    }
                }
                return dbUpdates.size();
            }
        };
        long processed = 0L;
        do {
            processed = ((Integer)handler.handle()).intValue();
        } while ((thisIsPriorityWork || this.prioritizedUpdates.size() <= 0) && processed > 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void addPrioritizedUpdates(Iterator<Long> iterator) throws QuotaUpdateException {
        if (this.namesystem.isLeader()) {
            this.prioritizedUpdates.add(iterator);
            Daemon daemon = this.updateThread;
            synchronized (daemon) {
                this.updateThread.notify();
            }
        } else {
            throw new QuotaUpdateException("Non leader namenode cannot prioritize quota updates for inodes");
        }
    }

    private class QuotaUpdateMonitor
    implements Runnable {
        private QuotaUpdateMonitor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (QuotaUpdateManager.this.namesystem.isRunning()) {
                try {
                    long sleepDuration;
                    if (!QuotaUpdateManager.this.namesystem.isLeader()) {
                        Thread.sleep(QuotaUpdateManager.this.updateInterval);
                        continue;
                    }
                    long startTime = System.currentTimeMillis();
                    boolean rerunImmediatly = false;
                    QuotaUpdateManager.this.applyAllPrioritizedUpdates();
                    if (!QuotaUpdateManager.this.pauseAsyncOps) {
                        QuotaUpdateManager.this.processNextUpdateBatch();
                        boolean bl = rerunImmediatly = QuotaUpdateManager.this.countPendingQuota() > 0 || QuotaUpdateManager.this.prioritizedUpdates.size() > 0;
                    }
                    if (rerunImmediatly || (sleepDuration = (long)QuotaUpdateManager.this.updateInterval - (System.currentTimeMillis() - startTime)) <= 0L) continue;
                    Daemon daemon = QuotaUpdateManager.this.updateThread;
                    synchronized (daemon) {
                        QuotaUpdateManager.this.updateThread.wait(QuotaUpdateManager.this.updateInterval);
                    }
                }
                catch (InterruptedException ie) {
                    LOG.warn((Object)"QuotaUpdateMonitor thread received InterruptedException.", (Throwable)ie);
                    break;
                }
                catch (StorageException e) {
                    LOG.warn((Object)"QuotaUpdateMonitor thread received StorageException.", (Throwable)e);
                }
                catch (Throwable t) {
                    LOG.error((Object)"QuotaUpdateMonitor thread received Runtime exception. ", t);
                }
            }
        }
    }
}

