/*
 * Decompiled with CFR 0.152.
 */
package io.hops.yarn.server.resourcemanager.quota;

import io.hops.exception.StorageException;
import io.hops.metadata.yarn.dal.quota.ProjectQuotaDataAccess;
import io.hops.metadata.yarn.dal.quota.ProjectsDailyCostDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.quota.PriceMultiplicator;
import io.hops.metadata.yarn.entity.quota.ProjectDailyCost;
import io.hops.metadata.yarn.entity.quota.ProjectQuota;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.HopsWorksHelper;
import io.hops.util.RMStorageFactory;
import io.hops.yarn.server.resourcemanager.quota.QuotaComputeEvent;
import io.hops.yarn.server.resourcemanager.quota.QuotaContainerFinishEvent;
import io.hops.yarn.server.resourcemanager.quota.QuotaContainerStartEvent;
import io.hops.yarn.server.resourcemanager.quota.QuotaContainerUpdateEvent;
import io.hops.yarn.server.resourcemanager.quota.QuotaEvent;
import io.hops.yarn.server.resourcemanager.quota.QuotaEventType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;

public class QuotaService
extends CompositeService {
    private static final Log LOG = LogFactory.getLog(QuotaService.class);
    private Dispatcher dispatcher;
    boolean quotaServiceEnabled;
    Map<ContainerId, ContainerStartData> runningContainers = new ConcurrentHashMap<ContainerId, ContainerStartData>();
    Map<ContainerId, Long> containerQuotaTime = new ConcurrentHashMap<ContainerId, Long>();
    private int minVcores;
    private int minMemory;
    private int minGpus;
    private float basePriceGeneral;
    private float basePriceGpu;
    private long minRunTime;
    private Thread quotaSchedulingThread;
    private volatile boolean stopped = false;
    private long quotaSchedulingPeriod;
    private RMContext rmContext;
    private Map<PriceMultiplicator.MultiplicatorType, Float> currentMultiplicators = new HashMap<PriceMultiplicator.MultiplicatorType, Float>();

    public QuotaService(RMContext rmContext) {
        super(QuotaService.class.getName());
        this.rmContext = rmContext;
    }

    protected synchronized void serviceInit(Configuration conf) throws Exception {
        this.minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 1);
        assert (this.minVcores > 0) : "yarn.scheduler.minimum-allocation-vcores should not be null or negative";
        this.minGpus = Math.max(1, conf.getInt("yarn.resourcemanager.quota.minimum.charged.gpu", YarnConfiguration.DEFAULT_QUOTA_MINIMUM_CHARGED_GPUS));
        assert (this.minGpus > 0) : "yarn.resourcemanager.quota.minimum.charged.gpu should not be null or negative";
        this.minMemory = conf.getInt("yarn.resourcemanager.quota.minimum.charged.mb", YarnConfiguration.DEFAULT_QUOTA_MINIMUM_CHARGED_MB);
        assert (this.minMemory > 0) : "yarn.resourcemanager.quota.minimum.charged.mb should not be null or negative";
        this.basePriceGeneral = conf.getFloat("yarn.resourcemanager.quota.price.base.general", YarnConfiguration.DEFAULT_QUOTA_BASE_PRICE_GPU);
        this.basePriceGpu = conf.getFloat("yarn.resourcemanager.quota.price.base.gpu", YarnConfiguration.DEFAULT_QUOTA_BASE_PRICE_GPU);
        this.minRunTime = conf.getInt("yarn.resourcemanager.quota.min.runtime", YarnConfiguration.DEFAULT_QUOTA_MIN_RUN_TIME);
        this.quotaServiceEnabled = conf.getBoolean("yarn.resourcemanager.quota.enabled", false);
        this.quotaSchedulingPeriod = conf.getLong("yarn.resourcemanager.quota.scheduling.period", 10000L);
        if (this.quotaServiceEnabled) {
            this.dispatcher = this.createDispatcher(conf);
            this.dispatcher.register(QuotaEventType.class, (EventHandler)new ForwardingEventHandler());
            this.addIfService(this.dispatcher);
        }
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        assert (!this.stopped) : "starting when already stopped";
        LOG.info((Object)"Starting a new quota schedular service");
        if (this.quotaServiceEnabled) {
            this.quotaSchedulingThread = new Thread(new WorkingThread());
            this.quotaSchedulingThread.setName("Quota scheduling");
            this.quotaSchedulingThread.start();
        }
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.stopped = true;
        if (this.quotaSchedulingThread != null) {
            this.quotaSchedulingThread.interrupt();
        }
        super.serviceStop();
        LOG.info((Object)"Stopped the quota scheduling service.");
    }

    protected Dispatcher createDispatcher(Configuration conf) {
        MultiThreadedDispatcher dispatcher = new MultiThreadedDispatcher(conf.getInt("yarn.resourcemanager.quota.multi-threaded-dispatcher.pool-size", 10));
        dispatcher.setDrainEventsOnStop();
        return dispatcher;
    }

    protected void handleQuotaEvent(QuotaEvent event) throws IOException {
        switch ((QuotaEventType)event.getType()) {
            case CONTAINER_START: {
                QuotaContainerStartEvent qcsEvent = (QuotaContainerStartEvent)event;
                this.addToRunningContainers(qcsEvent);
                break;
            }
            case CONTAINER_UPDATE: {
                QuotaContainerUpdateEvent qcuEvent = (QuotaContainerUpdateEvent)event;
                this.processContainerUpdate(qcuEvent);
                break;
            }
            case CONTAINER_FINISH: {
                QuotaContainerFinishEvent qcfEvent = (QuotaContainerFinishEvent)event;
                this.processContainerFinish(qcfEvent);
                break;
            }
            case QUOTA_COMPUTE: {
                QuotaComputeEvent qcEvent = (QuotaComputeEvent)event;
                this.processQuotaCompute(qcEvent);
                break;
            }
            default: {
                LOG.error((Object)("Unknown QuotaEvent type: " + event.getType()));
            }
        }
    }

    public void containerStarted(RMContainer container) {
        if (this.quotaServiceEnabled) {
            this.dispatcher.getEventHandler().handle((Event)new QuotaContainerStartEvent(container.getContainerId(), ContainerStartData.newInstance((ContainerId)container.getContainerId(), (Resource)container.getAllocatedResource(), (NodeId)container.getAllocatedNode(), (Priority)container.getAllocatedPriority(), (long)container.getCreationTime())));
        }
    }

    public void containerUpdated(RMContainer container, long updateTime) {
        if (this.quotaServiceEnabled) {
            this.dispatcher.getEventHandler().handle((Event)new QuotaContainerUpdateEvent(container.getContainerId(), ContainerStartData.newInstance((ContainerId)container.getContainerId(), (Resource)container.getAllocatedResource(), (NodeId)container.getAllocatedNode(), (Priority)container.getAllocatedPriority(), (long)container.getCreationTime()), updateTime));
        }
    }

    public void containerFinished(RMContainer container) {
        if (this.quotaServiceEnabled) {
            this.dispatcher.getEventHandler().handle((Event)new QuotaContainerFinishEvent(container.getContainerId(), ContainerFinishData.newInstance((ContainerId)container.getContainerId(), (long)container.getFinishTime(), (String)container.getDiagnosticsInfo(), (int)container.getContainerExitStatus(), (ContainerState)container.getContainerState())));
        }
    }

    public void computeQuota(ContainerId containerId, long computeTime) {
        if (this.quotaServiceEnabled) {
            this.dispatcher.getEventHandler().handle((Event)new QuotaComputeEvent(containerId, computeTime));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToRunningContainers(QuotaContainerStartEvent event) {
        ContainerStartData containerStartData = event.getContainerStartData();
        synchronized (containerStartData) {
            this.runningContainers.put(event.getContainerId(), event.getContainerStartData());
            this.containerQuotaTime.put(event.getContainerId(), event.getContainerStartData().getStartTime());
        }
    }

    private void processContainerFinish(QuotaContainerFinishEvent event) throws IOException {
        ContainerStartData startData = this.runningContainers.remove(event.getContainerId());
        if (startData != null) {
            this.computeAndApplyCharge(startData, event.getContainerFinishData().getFinishTime(), true);
        } else {
            LOG.error((Object)("Container finished before starting: " + event.getContainerId()));
        }
    }

    private void processContainerUpdate(QuotaContainerUpdateEvent event) throws IOException {
        ContainerStartData startData = this.runningContainers.put(event.getContainerId(), event.getContainerStartData());
        if (startData != null) {
            this.computeAndApplyCharge(startData, event.getUpdateTime(), false);
        } else {
            LOG.error((Object)("Container updated before starting: " + event.getContainerId()));
        }
    }

    private void processQuotaCompute(QuotaComputeEvent event) throws IOException {
        ContainerStartData startData = this.runningContainers.get(event.getContainerId());
        if (startData != null) {
            this.computeAndApplyCharge(startData, event.getComputeTime(), false);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"comput event received after container finished");
        }
    }

    private void computeAndApplyCharge(final ContainerStartData startData, long endTime, boolean remove) throws IOException {
        Long quotaTime = new Long(0L);
        if (remove) {
            quotaTime = this.containerQuotaTime.remove(startData.getContainerId());
            if (endTime - startData.getStartTime() < this.minRunTime) {
                endTime = startData.getStartTime() + this.minRunTime - quotaTime;
            }
        } else {
            quotaTime = this.containerQuotaTime.get(startData.getContainerId());
        }
        if (quotaTime == null) {
            LOG.error((Object)"No container should reach this point without having a quotaTime");
            return;
        }
        final HashMap<String, Long> resourcesMap = new HashMap<String, Long>();
        final long usedMillis = endTime - quotaTime;
        Resource resource = startData.getAllocatedResource();
        for (ResourceInformation entry : resource.getResources()) {
            resourcesMap.put(entry.getName(), entry.getValue());
        }
        LightWeightRequestHandler quotaSchedulerHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.OTHER){

            public Object performTask() throws IOException {
                connector.beginTransaction();
                connector.writeLock();
                QuotaService.this.computeAndApplyChargeInt(startData.getContainerId(), resourcesMap, usedMillis);
                connector.commit();
                return null;
            }
        };
        quotaSchedulerHandler.handle();
        if (!remove) {
            this.containerQuotaTime.put(startData.getContainerId(), endTime);
        }
    }

    private void computeAndApplyChargeInt(ContainerId containerId, Map<String, Long> resourcesMap, long usedMillis) throws StorageException {
        ProjectQuotaDataAccess pqDA = (ProjectQuotaDataAccess)RMStorageFactory.getDataAccess(ProjectQuotaDataAccess.class);
        ProjectsDailyCostDataAccess pdcDA = (ProjectsDailyCostDataAccess)RMStorageFactory.getDataAccess(ProjectsDailyCostDataAccess.class);
        long curentDay = TimeUnit.DAYS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        ApplicationId appId = containerId.getApplicationAttemptId().getApplicationId();
        RMApp app = (RMApp)this.rmContext.getRMApps().get(appId);
        if (app == null) {
            LOG.error((Object)("Application not found: " + appId.toString() + " for container " + containerId));
            return;
        }
        String appOwner = app.getUser();
        String projectName = HopsWorksHelper.getProjectName((String)appOwner);
        String user = HopsWorksHelper.getUserName((String)appOwner);
        ProjectQuota projectQuota = (ProjectQuota)pqDA.get(projectName);
        ProjectDailyCost projectDailyCost = (ProjectDailyCost)pdcDA.get(projectName, user, curentDay);
        float charge = this.computeCharge(resourcesMap, usedMillis);
        this.chargeProjectQuota(projectQuota, projectName, user, containerId, charge);
        this.chargeProjectDailyCost(projectDailyCost, projectName, user, curentDay, charge, appId);
        if (projectQuota != null) {
            pqDA.add((Object)projectQuota);
        }
        if (projectDailyCost != null) {
            pdcDA.add((Object)projectDailyCost);
        }
    }

    private float computeCharge(Map<String, Long> resourcesMap, long usedMillis) {
        float multiplicator = this.rmContext.getPriceMultiplicatorService().getMultiplicator(PriceMultiplicator.MultiplicatorType.GENERAL);
        float vcoresUsage = (float)resourcesMap.get("vcores").longValue() / (float)this.minVcores;
        float vcoresCredit = (float)usedMillis / 1000.0f * vcoresUsage * multiplicator * this.basePriceGeneral;
        float memoryUsage = (float)resourcesMap.get("memory-mb").longValue() / (float)this.minMemory;
        float memoryCredit = (float)usedMillis / 1000.0f * memoryUsage * multiplicator * this.basePriceGeneral;
        Long nbGpus = resourcesMap.get("yarn.io/gpu");
        float gpuCredit = 0.0f;
        if (nbGpus != null && nbGpus != 0L) {
            float gpuUsage = (float)nbGpus.longValue() / (float)this.minGpus;
            multiplicator = this.rmContext.getPriceMultiplicatorService().getMultiplicator(PriceMultiplicator.MultiplicatorType.GPU);
            gpuCredit = (float)usedMillis / 1000.0f * gpuUsage * multiplicator * this.basePriceGpu;
        }
        float credit = Math.max(gpuCredit, Math.max(vcoresCredit, memoryCredit));
        return credit;
    }

    private void chargeProjectQuota(ProjectQuota projectQuota, String projectName, String user, ContainerId containerId, float charge) {
        LOG.info((Object)("Quota: project " + projectName + " user " + user + " has been charged " + charge + " for container: " + containerId));
        if (projectQuota != null) {
            projectQuota.decrementQuota(charge);
        } else {
            LOG.error((Object)("Project not found: " + projectName));
        }
    }

    private void chargeProjectDailyCost(ProjectDailyCost projectDailyCost, String projectName, String user, long day, float charge, ApplicationId appId) {
        LOG.debug((Object)("Quota: project " + projectName + " user " + user + " has used " + charge + " credits, on day: " + day));
        if (projectDailyCost == null) {
            projectDailyCost = new ProjectDailyCost(projectName, user, day, 0.0f, appId.toString());
        }
        projectDailyCost.incrementCharge(charge, appId.toString());
    }

    private class WorkingThread
    implements Runnable {
        private WorkingThread() {
        }

        @Override
        public void run() {
            LOG.info((Object)"Quota Scheduler started");
            while (!QuotaService.this.stopped && !Thread.currentThread().isInterrupted()) {
                long currentLoopTime = System.currentTimeMillis();
                for (Map.Entry<ContainerId, ContainerStartData> entry : QuotaService.this.runningContainers.entrySet()) {
                    QuotaService.this.computeQuota(entry.getKey(), currentLoopTime);
                }
                long duration = System.currentTimeMillis() - currentLoopTime;
                try {
                    Thread.sleep(Math.max(1L, QuotaService.this.quotaSchedulingPeriod - duration));
                }
                catch (InterruptedException ex) {
                    LOG.error((Object)ex, (Throwable)ex);
                }
            }
        }
    }

    protected static class MultiThreadedDispatcher
    extends CompositeService
    implements Dispatcher {
        private List<AsyncDispatcher> dispatchers = new ArrayList<AsyncDispatcher>();

        public MultiThreadedDispatcher(int num) {
            super(MultiThreadedDispatcher.class.getName());
            for (int i = 0; i < num; ++i) {
                AsyncDispatcher dispatcher = this.createDispatcher();
                this.dispatchers.add(dispatcher);
                this.addIfService(dispatcher);
            }
        }

        public EventHandler<Event> getEventHandler() {
            return new CompositEventHandler();
        }

        public void register(Class<? extends Enum> eventType, EventHandler handler) {
            for (AsyncDispatcher dispatcher : this.dispatchers) {
                dispatcher.register(eventType, handler);
            }
        }

        public void setDrainEventsOnStop() {
            for (AsyncDispatcher dispatcher : this.dispatchers) {
                dispatcher.setDrainEventsOnStop();
            }
        }

        protected AsyncDispatcher createDispatcher() {
            return new AsyncDispatcher("RM ApplicationQuota dispatcher");
        }

        private class CompositEventHandler
        implements EventHandler<Event> {
            private CompositEventHandler() {
            }

            public void handle(Event event) {
                int index = (event.hashCode() & Integer.MAX_VALUE) % MultiThreadedDispatcher.this.dispatchers.size();
                ((AsyncDispatcher)MultiThreadedDispatcher.this.dispatchers.get(index)).getEventHandler().handle(event);
            }
        }
    }

    private final class ForwardingEventHandler
    implements EventHandler<QuotaEvent> {
        private ForwardingEventHandler() {
        }

        public void handle(QuotaEvent event) {
            try {
                QuotaService.this.handleQuotaEvent(event);
            }
            catch (IOException ex) {
                LOG.error((Object)"error handling quota event", (Throwable)ex);
            }
        }
    }
}

