/*
 * Decompiled with CFR 0.152.
 */
package io.hops.yarn.server.resourcemanager.quota;

import io.hops.exception.StorageException;
import io.hops.metadata.yarn.dal.quota.PriceMultiplicatorDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.quota.PriceMultiplicator;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.RMStorageFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;

public class PriceMultiplicatorService
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(PriceMultiplicatorService.class);
    private boolean isVariablePrice;
    private final RMContext rmContext;
    private volatile boolean stopped = false;
    private Thread priceCalculationThread;
    private Map<PriceMultiplicator.MultiplicatorType, Float> tippingPoints = new HashMap<PriceMultiplicator.MultiplicatorType, Float>();
    private Map<PriceMultiplicator.MultiplicatorType, Float> incrementFactors = new HashMap<PriceMultiplicator.MultiplicatorType, Float>();
    private long priceMultiplicationFactorCalculationInterval;
    private Map<PriceMultiplicator.MultiplicatorType, Float> currentMultiplicators = new ConcurrentHashMap<PriceMultiplicator.MultiplicatorType, Float>();
    private PriceMultiplicatorDataAccess priceMultiplicatorDA;

    public PriceMultiplicatorService(RMContext rmctx) {
        super("Price multiplication factor service");
        this.rmContext = rmctx;
    }

    public void serviceInit(Configuration conf) throws Exception {
        LOG.info((Object)"Initializing price estimation service");
        this.tippingPoints.put(PriceMultiplicator.MultiplicatorType.GENERAL, Float.valueOf(conf.getFloat("yarn.resourcemanager.quota.multiplicator.threshold.general", YarnConfiguration.DEFAULT_QUOTA_MULTIPLICATOR_THRESHOLD_GENERAL)));
        this.tippingPoints.put(PriceMultiplicator.MultiplicatorType.GPU, Float.valueOf(conf.getFloat("yarn.resourcemanager.quota.multiplicator.threshold.gpu", YarnConfiguration.DEFAULT_QUOTA_MULTIPLICATOR_THRESHOLD_GPU)));
        this.incrementFactors.put(PriceMultiplicator.MultiplicatorType.GENERAL, Float.valueOf(conf.getFloat("yarn.resourcemanager.quota.multiplicator.increment.general", YarnConfiguration.DEFAULT_QUOTA_INCREMENT_FACTOR_GENERAL)));
        this.incrementFactors.put(PriceMultiplicator.MultiplicatorType.GPU, Float.valueOf(conf.getFloat("yarn.resourcemanager.quota.multiplicator.increment.gpu", YarnConfiguration.DEFAULT_QUOTA_INCREMENT_FACTOR_GPU)));
        this.priceMultiplicationFactorCalculationInterval = conf.getLong("yarn.resourcemanager.quota.multiplicator.interval", YarnConfiguration.DEFAULT_QUOTA_PRICE_MULTIPLICATOR_INTERVAL);
        this.isVariablePrice = conf.getBoolean("yarn.resourcemanager.quota.variable.price.enabled", YarnConfiguration.DEFAULT_QUOTA_VARIABLE_PRICE_ENABLED);
        if (this.isVariablePrice) {
            for (PriceMultiplicator.MultiplicatorType type : PriceMultiplicator.MultiplicatorType.values()) {
                this.currentMultiplicators.put(type, new Float(1.0f));
            }
        }
        super.serviceInit(conf);
    }

    private void recover() throws IOException {
        this.priceMultiplicatorDA = (PriceMultiplicatorDataAccess)RMStorageFactory.getDataAccess(PriceMultiplicatorDataAccess.class);
        Map<PriceMultiplicator.MultiplicatorType, PriceMultiplicator> currentMultiplicators = this.getCurrentMultiplicator();
        for (PriceMultiplicator.MultiplicatorType type : PriceMultiplicator.MultiplicatorType.values()) {
            if (currentMultiplicators.get(type) == null) continue;
            this.currentMultiplicators.put(type, Float.valueOf(currentMultiplicators.get(type).getValue()));
        }
    }

    private Map<PriceMultiplicator.MultiplicatorType, PriceMultiplicator> getCurrentMultiplicator() throws IOException {
        LightWeightRequestHandler currentPriceHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.OTHER){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.readCommitted();
                Map currentPrices = PriceMultiplicatorService.this.priceMultiplicatorDA.getAll();
                connector.commit();
                return currentPrices;
            }
        };
        return (Map)currentPriceHandler.handle();
    }

    protected void serviceStart() throws Exception {
        assert (!this.stopped) : "starting when already stopped";
        LOG.info((Object)"Starting a new price estimation service.");
        if (this.isVariablePrice) {
            this.recover();
            this.priceCalculationThread = new Thread(new WorkingThread());
            this.priceCalculationThread.setName("Price estimation service");
            this.priceCalculationThread.setDaemon(true);
            this.priceCalculationThread.start();
        }
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.stopped = true;
        if (this.priceCalculationThread != null) {
            this.priceCalculationThread.interrupt();
        }
        super.serviceStop();
        LOG.info((Object)"Stopping the price estimation service.");
    }

    protected void computeNewGpuPrice() {
        QueueMetrics metrics = this.rmContext.getScheduler().getRootQueueMetrics();
        float incrementBase = this.getPercenUsedGpus() - this.tippingPoints.get(PriceMultiplicator.MultiplicatorType.GPU).floatValue();
        incrementBase = Math.max(incrementBase, 0.0f);
        float multiplicator = 1.0f + incrementBase * this.incrementFactors.get(PriceMultiplicator.MultiplicatorType.GPU).floatValue();
        multiplicator = Math.max(multiplicator, this.currentMultiplicators.get(PriceMultiplicator.MultiplicatorType.GENERAL).floatValue());
        this.currentMultiplicators.put(PriceMultiplicator.MultiplicatorType.GPU, Float.valueOf(multiplicator));
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("New multiplicator: " + this.currentMultiplicators + " (mem: " + this.getPercenUsedMB(metrics) + ", vcores: " + this.getPercenUsedCores(metrics) + ", gpus: " + this.getPercenUsedGpus() + ")"));
        }
    }

    private float getPercenUsedGpus() {
        List nodes = ((AbstractYarnScheduler)this.rmContext.getScheduler()).getAllNodes();
        long totalGPUs = 0L;
        long usedGPUs = 0L;
        for (SchedulerNode node : nodes) {
            totalGPUs += node.getTotalResource().getResourceValue("yarn.io/gpu");
            usedGPUs += node.getAllocatedResource().getResourceValue("yarn.io/gpu");
        }
        return totalGPUs == 0L ? 0.0f : (float)usedGPUs / (float)totalGPUs;
    }

    private float getPercenUsedCores(QueueMetrics metrics) {
        int totalCores = metrics.getAllocatedVirtualCores() + metrics.getAvailableVirtualCores();
        int usedCores = metrics.getAllocatedVirtualCores() + metrics.getPendingVirtualCores();
        return totalCores == 0 ? 0.0f : (float)usedCores / (float)totalCores;
    }

    private float getPercenUsedMB(QueueMetrics metrics) {
        long totalMB = metrics.getAllocatedMB() + metrics.getAvailableMB();
        long usedMB = metrics.getAllocatedMB() + metrics.getPendingMB();
        return totalMB == 0L ? 0.0f : (float)usedMB / (float)totalMB;
    }

    protected void computeNewGeneralPrice() throws IOException {
        QueueMetrics metrics = this.rmContext.getScheduler().getRootQueueMetrics();
        float incrementBase = Math.max(this.getPercenUsedCores(metrics), this.getPercenUsedMB(metrics)) - this.tippingPoints.get(PriceMultiplicator.MultiplicatorType.GENERAL).floatValue();
        incrementBase = Math.max(incrementBase, 0.0f);
        this.currentMultiplicators.put(PriceMultiplicator.MultiplicatorType.GENERAL, Float.valueOf(1.0f + incrementBase * this.incrementFactors.get(PriceMultiplicator.MultiplicatorType.GENERAL).floatValue()));
        LOG.debug((Object)("New multiplicator: " + this.currentMultiplicators + " (mem: " + this.getPercenUsedMB(metrics) + ", vcores: " + this.getPercenUsedCores(metrics) + ")"));
    }

    private void persistMultiplicators() throws IOException {
        LightWeightRequestHandler prepareHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.OTHER){

            public Object performTask() throws IOException {
                connector.beginTransaction();
                connector.writeLock();
                for (Map.Entry entry : PriceMultiplicatorService.this.currentMultiplicators.entrySet()) {
                    PriceMultiplicatorService.this.priceMultiplicatorDA.add((Object)new PriceMultiplicator((PriceMultiplicator.MultiplicatorType)entry.getKey(), ((Float)entry.getValue()).floatValue()));
                }
                connector.commit();
                LOG.debug((Object)("Commited new multiplicator: " + PriceMultiplicatorService.this.currentMultiplicators + "for VARIABLE"));
                return null;
            }
        };
        prepareHandler.handle();
    }

    public float getMultiplicator(PriceMultiplicator.MultiplicatorType type) {
        return this.currentMultiplicators.get(type).floatValue();
    }

    private class WorkingThread
    implements Runnable {
        private WorkingThread() {
        }

        @Override
        public void run() {
            LOG.info((Object)"Price estimation service started");
            while (!PriceMultiplicatorService.this.stopped && !Thread.currentThread().isInterrupted()) {
                try {
                    long start = System.currentTimeMillis();
                    PriceMultiplicatorService.this.computeNewGeneralPrice();
                    PriceMultiplicatorService.this.computeNewGpuPrice();
                    PriceMultiplicatorService.this.persistMultiplicators();
                    long duration = System.currentTimeMillis() - start;
                    Thread.sleep(Math.max(PriceMultiplicatorService.this.priceMultiplicationFactorCalculationInterval - duration, 1L));
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                catch (IOException ex) {
                    LOG.error((Object)ex, (Throwable)ex);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException ex1) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            LOG.info((Object)"Quota scheduler thread is exiting gracefully");
        }
    }
}

