/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.quota;

import io.hops.exception.StorageException;
import io.hops.metadata.yarn.dal.quota.ContainersCheckPointsDataAccess;
import io.hops.metadata.yarn.dal.quota.ContainersLogsDataAccess;
import io.hops.metadata.yarn.dal.quota.ProjectQuotaDataAccess;
import io.hops.metadata.yarn.dal.quota.ProjectsDailyCostDataAccess;
import io.hops.metadata.yarn.dal.rmstatestore.ApplicationStateDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.quota.ContainerCheckPoint;
import io.hops.metadata.yarn.entity.quota.ContainerLog;
import io.hops.metadata.yarn.entity.quota.ProjectDailyCost;
import io.hops.metadata.yarn.entity.quota.ProjectDailyId;
import io.hops.metadata.yarn.entity.quota.ProjectQuota;
import io.hops.metadata.yarn.entity.rmstatestore.ApplicationState;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.HopsWorksHelper;
import io.hops.util.RMStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class QuotaService
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(QuotaService.class);
    private Thread quotaSchedulingThread;
    private volatile boolean stopped = false;
    private long minNumberOfTicks = 1L;
    private long batchTime;
    private int batchSize;
    private int minVcores;
    private int minMemory;
    private int minGpus;
    private float basePriceGeneral;
    private float basePriceGpu;
    ApplicationStateDataAccess appStatDS = (ApplicationStateDataAccess)RMStorageFactory.getDataAccess(ApplicationStateDataAccess.class);
    Map<String, String> applicationOwnerCache = new HashMap<String, String>();
    Map<String, ContainerCheckPoint> containersCheckPoints;
    Set<String> recovered = new HashSet<String>();
    BlockingQueue<ContainerLog> eventContainersLogs = new LinkedBlockingQueue<ContainerLog>();
    Map<ProjectDailyId, ProjectDailyCost> projectsDailyCostCache;
    long cashDay = -1L;

    public QuotaService() {
        super("quota scheduler service");
    }

    protected void serviceStart() throws Exception {
        assert (!this.stopped) : "starting when already stopped";
        LOG.info((Object)"Starting a new quota schedular service");
        this.recover();
        this.quotaSchedulingThread = new Thread(new WorkingThread());
        this.quotaSchedulingThread.setName("Quota scheduling service");
        this.quotaSchedulingThread.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.stopped = true;
        if (this.quotaSchedulingThread != null) {
            this.quotaSchedulingThread.interrupt();
        }
        super.serviceStop();
        LOG.info((Object)"Stopped the quota schedular service.");
    }

    public void serviceInit(Configuration conf) throws Exception {
        this.minNumberOfTicks = conf.getInt("yarn.resourcemanager.quota.minTicksCharge", YarnConfiguration.DEFAULT_QUOTA_MIN_TICKS_CHARGE);
        this.batchTime = conf.getLong("yarn.resourcemanager.quota.batch.time", YarnConfiguration.DEFAULT_QUOTA_BATCH_TIME);
        this.batchSize = conf.getInt("yarn.resourcemanager.quota.batch.size", YarnConfiguration.DEFAULT_QUOTA_BATCH_SIZE);
        this.minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 1);
        this.minGpus = Math.max(1, conf.getInt("yarn.scheduler.minimum-allocation-gpus", 0));
        this.minMemory = conf.getInt("yarn.resourcemanager.quota.minimum.charged.mb", YarnConfiguration.DEFAULT_QUOTA_MINIMUM_CHARGED_MB);
        this.basePriceGeneral = conf.getFloat("yarn.resourcemanager.quota.price.base.general", YarnConfiguration.DEFAULT_QUOTA_BASE_PRICE_GPU);
        this.basePriceGeneral = conf.getFloat("yarn.resourcemanager.quota.price.base.general", YarnConfiguration.DEFAULT_QUOTA_BASE_PRICE_GENERAL);
    }

    public void insertEvents(Collection<ContainerLog> containersLogs) {
        for (ContainerLog cl : containersLogs) {
            this.eventContainersLogs.add(cl);
        }
    }

    protected void computeAndApplyCharge(final Collection<ContainerLog> ContainersLogs, final boolean isRecover) throws IOException {
        LightWeightRequestHandler quotaSchedulerHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws IOException {
                connector.beginTransaction();
                connector.writeLock();
                QuotaService.this.computeAndApplyChargeInt(ContainersLogs, isRecover);
                connector.commit();
                return null;
            }
        };
        quotaSchedulerHandler.handle();
    }

    private void computeAndApplyChargeInt(Collection<ContainerLog> ContainersLogs, boolean isRecover) throws StorageException {
        ProjectQuotaDataAccess pqDA = (ProjectQuotaDataAccess)RMStorageFactory.getDataAccess(ProjectQuotaDataAccess.class);
        Map projectsQuotaMap = pqDA.getAll();
        long curentDay = TimeUnit.DAYS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        HashMap<String, ProjectQuota> chargedProjects = new HashMap<String, ProjectQuota>();
        HashMap<ProjectDailyId, ProjectDailyCost> chargedProjectsDailyCost = new HashMap<ProjectDailyId, ProjectDailyCost>();
        ArrayList<ContainerLog> toBeRemovedContainersLogs = new ArrayList<ContainerLog>();
        ArrayList<ContainerCheckPoint> toBePercistedContainerCheckPoint = new ArrayList<ContainerCheckPoint>();
        ArrayList<ContainerCheckPoint> toBeRemovedContainerCheckPoint = new ArrayList<ContainerCheckPoint>();
        for (ContainerLog containerLog : ContainersLogs) {
            ContainerCheckPoint newCheckpoint;
            long nbRunningTicks;
            Object containerId;
            ApplicationId appId;
            String appOwner;
            if (!isRecover && this.recovered.remove(containerLog.getContainerid())) continue;
            if (isRecover) {
                this.recovered.add(containerLog.getContainerid());
            }
            if ((appOwner = this.applicationOwnerCache.get((appId = (containerId = ConverterUtils.toContainerId((String)containerLog.getContainerid())).getApplicationAttemptId().getApplicationId()).toString())) == null) {
                ApplicationState appState = (ApplicationState)this.appStatDS.findByApplicationId(appId.toString());
                if (appState == null) {
                    LOG.error((Object)("Application not found: " + appId.toString() + " for container " + containerLog.getContainerid()));
                    continue;
                }
                if (this.applicationOwnerCache.size() > 100000) {
                    this.applicationOwnerCache = new HashMap<String, String>();
                }
                appOwner = appState.getUser();
                this.applicationOwnerCache.put(appId.toString(), appOwner);
            }
            String projectName = HopsWorksHelper.getProjectName((String)appOwner);
            String user = HopsWorksHelper.getUserName((String)appOwner);
            Long checkpoint = containerLog.getStart();
            float currentMultiplicator = containerLog.getMultiplicator();
            ContainerCheckPoint lastCheckPoint = this.containersCheckPoints.get(containerLog.getContainerid());
            if (lastCheckPoint != null) {
                checkpoint = lastCheckPoint.getCheckPoint();
                currentMultiplicator = lastCheckPoint.getMultiplicator();
            }
            if ((nbRunningTicks = containerLog.getStop() - checkpoint) > 0L) {
                if (containerLog.getExitstatus() == -201) {
                    newCheckpoint = new ContainerCheckPoint(containerLog.getContainerid(), containerLog.getStop(), currentMultiplicator);
                    this.containersCheckPoints.put(containerLog.getContainerid(), newCheckpoint);
                    toBePercistedContainerCheckPoint.add(newCheckpoint);
                    LOG.debug((Object)("charging project still running " + projectName + " for container " + containerLog.getContainerid() + " current ticks " + nbRunningTicks + "(" + containerLog.getStart() + ", " + containerLog.getStop() + ", " + checkpoint + ") current multiplicator " + currentMultiplicator));
                    float charge = this.computeCharge(nbRunningTicks, currentMultiplicator, containerLog.getNbVcores(), containerLog.getMemoryUsed(), containerLog.getGpuUsed());
                    this.chargeProjectQuota(chargedProjects, projectsQuotaMap, projectName, user, containerLog.getContainerid(), charge);
                    this.chargeProjectDailyCost(chargedProjectsDailyCost, projectName, user, curentDay, charge, containerId.getApplicationAttemptId().getApplicationId());
                    continue;
                }
                toBeRemovedContainersLogs.add(containerLog);
                if (this.containersCheckPoints.remove(containerLog.getContainerid()) != null) {
                    toBeRemovedContainerCheckPoint.add(new ContainerCheckPoint(containerLog.getContainerid()));
                }
                LOG.debug((Object)("charging project finished " + projectName + " for container " + containerLog.getContainerid() + " current ticks " + nbRunningTicks + " current multiplicator " + currentMultiplicator));
                float charge = this.computeCharge(nbRunningTicks, currentMultiplicator, containerLog.getNbVcores(), containerLog.getMemoryUsed(), containerLog.getGpuUsed());
                this.chargeProjectQuota(chargedProjects, projectsQuotaMap, projectName, user, containerLog.getContainerid(), charge);
                this.chargeProjectDailyCost(chargedProjectsDailyCost, projectName, user, curentDay, charge, containerId.getApplicationAttemptId().getApplicationId());
                continue;
            }
            if (checkpoint.longValue() == containerLog.getStart() && containerLog.getExitstatus() == -201) {
                newCheckpoint = new ContainerCheckPoint(containerLog.getContainerid(), containerLog.getStart(), currentMultiplicator);
                this.containersCheckPoints.put(containerLog.getContainerid(), newCheckpoint);
                toBePercistedContainerCheckPoint.add(newCheckpoint);
                continue;
            }
            if (containerLog.getExitstatus() == -201) continue;
            toBeRemovedContainersLogs.add(containerLog);
            if (this.containersCheckPoints.remove(containerLog.getContainerid()) == null) continue;
            toBeRemovedContainerCheckPoint.add(new ContainerCheckPoint(containerLog.getContainerid()));
        }
        ContainersLogsDataAccess csDA = (ContainersLogsDataAccess)RMStorageFactory.getDataAccess(ContainersLogsDataAccess.class);
        csDA.removeAll(toBeRemovedContainersLogs);
        ContainersCheckPointsDataAccess ccpDA = (ContainersCheckPointsDataAccess)RMStorageFactory.getDataAccess(ContainersCheckPointsDataAccess.class);
        ccpDA.addAll(toBePercistedContainerCheckPoint);
        ccpDA.removeAll(toBeRemovedContainerCheckPoint);
        if (LOG.isDebugEnabled()) {
            for (ProjectQuota _cpq : chargedProjects.values()) {
                LOG.debug((Object)("RIZ:: Charged projects: " + _cpq.toString() + " charge amount:" + _cpq.getTotalUsedQuota()));
            }
        }
        pqDA.addAll(chargedProjects.values());
        ProjectsDailyCostDataAccess pdcDA = (ProjectsDailyCostDataAccess)RMStorageFactory.getDataAccess(ProjectsDailyCostDataAccess.class);
        pdcDA.addAll(chargedProjectsDailyCost.values());
    }

    private void chargeProjectQuota(Map<String, ProjectQuota> chargedProjectsQuota, Map<String, ProjectQuota> projectsQuotaMap, String projectid, String user, String containerId, float charge) {
        LOG.info((Object)("Quota: project " + projectid + " user " + user + " has been charged " + charge + " for container: " + containerId));
        ProjectQuota projectQuota = projectsQuotaMap.get(projectid);
        if (projectQuota != null) {
            projectQuota.decrementQuota(charge);
            chargedProjectsQuota.put(projectid, projectQuota);
        } else {
            LOG.error((Object)("Project not found: " + projectid));
        }
    }

    private void chargeProjectDailyCost(Map<ProjectDailyId, ProjectDailyCost> chargedProjectsDailyCost, String projectid, String user, long day, float charge, ApplicationId appId) {
        ProjectDailyId key;
        ProjectDailyCost projectDailyCost;
        LOG.debug((Object)("Quota: project " + projectid + " user " + user + " has used " + charge + " credits, on day: " + day));
        if (this.cashDay != day) {
            this.projectsDailyCostCache = new HashMap<ProjectDailyId, ProjectDailyCost>();
            this.cashDay = day;
        }
        if ((projectDailyCost = this.projectsDailyCostCache.get(key = new ProjectDailyId(projectid, user, day))) == null) {
            projectDailyCost = new ProjectDailyCost(projectid, user, day, 0.0f, appId.toString());
            this.projectsDailyCostCache.put(key, projectDailyCost);
        }
        projectDailyCost.incrementCharge(charge, appId.toString());
        chargedProjectsDailyCost.put(key, projectDailyCost);
    }

    private float computeCharge(long ticks, float multiplicator, int nbVcores, long memoryUsed, int nbGpus) {
        if (ticks < this.minNumberOfTicks) {
            ticks = this.minNumberOfTicks;
        }
        float vcoresUsage = (float)nbVcores / (float)this.minVcores;
        float memoryUsage = (float)memoryUsed / (float)this.minMemory;
        float gpuUsage = (float)nbGpus / (float)this.minGpus;
        float basePrice = this.basePriceGeneral;
        if (gpuUsage != 0.0f) {
            basePrice = this.basePriceGpu;
        }
        float credit = (float)ticks * Math.max(gpuUsage, Math.max(vcoresUsage, memoryUsage)) * multiplicator * basePrice;
        return credit;
    }

    public void recover() throws IOException {
        long day;
        this.cashDay = day = TimeUnit.DAYS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        LightWeightRequestHandler recoveryHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws IOException {
                connector.beginTransaction();
                connector.writeLock();
                ProjectsDailyCostDataAccess pdcDA = (ProjectsDailyCostDataAccess)RMStorageFactory.getDataAccess(ProjectsDailyCostDataAccess.class);
                QuotaService.this.projectsDailyCostCache = pdcDA.getByDay(day);
                ContainersCheckPointsDataAccess ccpDA = (ContainersCheckPointsDataAccess)RMStorageFactory.getDataAccess(ContainersCheckPointsDataAccess.class);
                QuotaService.this.containersCheckPoints = ccpDA.getAll();
                ContainersLogsDataAccess csDA = (ContainersLogsDataAccess)RMStorageFactory.getDataAccess(ContainersLogsDataAccess.class);
                Map hopContainersLogs = csDA.getAll();
                connector.commit();
                return hopContainersLogs;
            }
        };
        Map hopContainersLogs = (Map)recoveryHandler.handle();
        this.computeAndApplyCharge(hopContainersLogs.values(), true);
    }

    private class WorkingThread
    implements Runnable {
        private WorkingThread() {
        }

        @Override
        public void run() {
            LOG.info((Object)"Quota Scheduler started");
            while (!QuotaService.this.stopped && !Thread.currentThread().isInterrupted()) {
                try {
                    ArrayList<ContainerLog> containersLogs = new ArrayList<ContainerLog>();
                    Long start = System.currentTimeMillis();
                    long duration = 0L;
                    do {
                        ContainerLog log;
                        if ((log = QuotaService.this.eventContainersLogs.poll(Math.max(1L, QuotaService.this.batchTime - duration), TimeUnit.MILLISECONDS)) == null) continue;
                        containersLogs.add(log);
                    } while ((duration = System.currentTimeMillis() - start) < QuotaService.this.batchTime && containersLogs.size() < QuotaService.this.batchSize);
                    QuotaService.this.computeAndApplyCharge(containersLogs, false);
                }
                catch (IOException | InterruptedException ex) {
                    LOG.error((Object)ex, (Throwable)ex);
                }
            }
            LOG.info((Object)"Quota scheduler thread is exiting gracefully");
        }
    }
}

