/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.quota;

import io.hops.exception.StorageException;
import io.hops.metadata.common.entity.LongVariable;
import io.hops.metadata.common.entity.Variable;
import io.hops.metadata.hdfs.dal.VariableDataAccess;
import io.hops.metadata.yarn.dal.quota.ContainersLogsDataAccess;
import io.hops.metadata.yarn.dal.quota.PriceMultiplicatorDataAccess;
import io.hops.metadata.yarn.dal.util.YARNOperationType;
import io.hops.metadata.yarn.entity.ContainerStatus;
import io.hops.metadata.yarn.entity.quota.ContainerLog;
import io.hops.metadata.yarn.entity.quota.PriceMultiplicator;
import io.hops.transaction.handler.LightWeightRequestHandler;
import io.hops.transaction.handler.RequestHandler;
import io.hops.util.RMStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.quota.QuotaService;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class ContainersLogsService
extends CompositeService {
    private static final Log LOG = LogFactory.getLog(ContainersLogsService.class);
    Configuration conf;
    private Thread tickThread;
    private volatile boolean stopped;
    private long monitorInterval;
    private boolean checkpointEnabled;
    private int checkpointInterval;
    private final RMContext rMContext;
    private Map<PriceMultiplicator.MultiplicatorType, Float> currentMultiplicators = new HashMap<PriceMultiplicator.MultiplicatorType, Float>();
    private long multiplicatorPeirod;
    ContainersLogsDataAccess containersLogsDA;
    VariableDataAccess variableDA;
    Map<String, ContainerLog> activeContainers = new HashMap<String, ContainerLog>();
    Map<String, ContainerLog> updateContainers = new HashMap<String, ContainerLog>();
    LinkedBlockingQueue<ContainerStatus> eventContainers = new LinkedBlockingQueue();
    LongVariable tickCounter = new LongVariable(Variable.Finder.QuotaTicksCounter, 0L);
    boolean recovered = true;

    public ContainersLogsService(RMContext rMContext) {
        super(ContainersLogsService.class.getName());
        this.rMContext = rMContext;
    }

    public void serviceInit(Configuration conf) throws Exception {
        LOG.info((Object)"Initializing containers logs service");
        this.conf = conf;
        this.monitorInterval = this.conf.getLong("yarn.resourcemanager.quota.containers.log.period", YarnConfiguration.DEFAULT_QUOTA_CONTAINERS_LOGS_MONITOR_INTERVAL);
        this.checkpointEnabled = this.conf.getBoolean("yarn.resourcemanager.quota.containers.log.checkpoints.enabled", YarnConfiguration.DEFAULT_QUOTA_CONTAINERS_LOGS_CHECKPOINTS_ENABLED);
        this.checkpointInterval = this.conf.getInt("yarn.resourcemanager.quota.containers.log.checkpoints.period", YarnConfiguration.DEFAULT_QUOTA_CONTAINERS_LOGS_CHECKPOINTS_MINTICKS) * this.conf.getInt("yarn.resourcemanager.quota.minTicksCharge", YarnConfiguration.DEFAULT_QUOTA_MIN_TICKS_CHARGE);
        this.multiplicatorPeirod = this.conf.getLong("yarn.resourcemanager.quota.multiplicator.fixed.period", YarnConfiguration.DEFAULT_QUOTA_FIXED_MULTIPLICATOR_PERIOD) * (long)this.checkpointInterval;
        for (PriceMultiplicator.MultiplicatorType type : PriceMultiplicator.MultiplicatorType.values()) {
            this.currentMultiplicators.put(type, new Float(1.0f));
        }
        this.containersLogsDA = (ContainersLogsDataAccess)RMStorageFactory.getDataAccess(ContainersLogsDataAccess.class);
        this.variableDA = (VariableDataAccess)RMStorageFactory.getDataAccess(VariableDataAccess.class);
        this.tickThread = new Thread(new TickThread());
        this.tickThread.setName("ContainersLogs Tick Thread");
        this.tickThread.setDaemon(true);
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        LOG.info((Object)"Starting containers logs service");
        this.recover();
        this.tickThread.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        LOG.info((Object)"Stopping containers logs service");
        this.stopped = true;
        if (this.tickThread != null) {
            this.tickThread.interrupt();
        }
        super.serviceStop();
    }

    public void insertEvent(List<ContainerStatus> changedContainerStatuses) {
        LOG.debug((Object)("CL :: New event, size: " + changedContainerStatuses.size()));
        for (ContainerStatus cs : changedContainerStatuses) {
            try {
                this.eventContainers.put(cs);
            }
            catch (InterruptedException ex) {
                LOG.warn((Object)("Unable to insert container status: " + cs.toString() + " inside event queue"), (Throwable)ex);
            }
        }
    }

    public synchronized void setCurrentPrices(Map<PriceMultiplicator.MultiplicatorType, Float> currentPrices) {
        this.currentMultiplicators = currentPrices;
    }

    private List<ContainerStatus> getLatestEvents() {
        ArrayList<ContainerStatus> oldEvents = new ArrayList<ContainerStatus>();
        while (!this.eventContainers.isEmpty()) {
            oldEvents.add(this.eventContainers.poll());
        }
        return oldEvents;
    }

    public void recover() {
        LOG.info((Object)"Starting containers logs recovery");
        try {
            this.tickCounter = this.getTickCounter();
            this.activeContainers = this.getContainersLogs();
            Map<PriceMultiplicator.MultiplicatorType, PriceMultiplicator> currentMultiplicators = this.getCurrentMultiplicator();
            for (PriceMultiplicator.MultiplicatorType type : PriceMultiplicator.MultiplicatorType.values()) {
                if (currentMultiplicators.get(type) == null) continue;
                this.currentMultiplicators.put(type, Float.valueOf(currentMultiplicators.get(type).getValue()));
            }
            this.finishLogging();
            this.updateContainersLogs(false);
            LOG.info((Object)"Finished containers logs recovery");
        }
        catch (Exception ex) {
            LOG.warn((Object)"Unable to finish containers logs recovery", (Throwable)ex);
        }
    }

    private void finishLogging() {
        for (ContainerLog log : this.activeContainers.values()) {
            log.setStop(this.tickCounter.getValue().longValue());
            log.setExitstatus(-100);
            this.updateContainers.put(log.getContainerId(), log);
        }
        this.activeContainers.clear();
    }

    private synchronized void checkEventContainerStatuses(List<ContainerStatus> latestEvents) {
        for (ContainerStatus cs : latestEvents) {
            boolean updatable = false;
            if (cs.getState().equals(ContainerState.NEW)) continue;
            ContainerLog cl = this.activeContainers.get(cs.getContainerid());
            if (cl == null) {
                RMContainer container = this.rMContext.getScheduler().getRMContainer(ConverterUtils.toContainerId((String)cs.getContainerid()));
                Resource containerResources = null;
                containerResources = container == null ? Resource.newInstance((int)0, (int)0) : container.getContainer().getResource();
                float currentMultiplicator = containerResources.getGPUs() != 0 ? this.currentMultiplicators.get(PriceMultiplicator.MultiplicatorType.GPU).floatValue() : this.currentMultiplicators.get(PriceMultiplicator.MultiplicatorType.GENERAL).floatValue();
                cl = new ContainerLog(cs.getContainerid(), this.tickCounter.getValue().longValue(), -201, currentMultiplicator, containerResources.getVirtualCores(), containerResources.getMemorySize(), containerResources.getGPUs());
                if (cs.getState().equals(ContainerState.COMPLETE.toString())) {
                    cl.setExitstatus(-404);
                }
                this.activeContainers.put(cl.getContainerid(), cl);
                updatable = true;
            }
            if (cs.getState().equals(ContainerState.COMPLETE.toString())) {
                cl.setStop(this.tickCounter.getValue().longValue());
                cl.setExitstatus(cs.getExitstatus());
                this.activeContainers.remove(cl.getContainerid());
                updatable = true;
            }
            if (!updatable) continue;
            this.updateContainers.put(cl.getContainerid(), cl);
        }
    }

    private void updateContainersLogs(final boolean updatetTick) {
        try {
            LightWeightRequestHandler containersLogsHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

                public Object performTask() throws StorageException {
                    connector.beginTransaction();
                    connector.writeLock();
                    if (ContainersLogsService.this.updateContainers.size() > 0) {
                        LOG.debug((Object)("CL :: Update containers logs size: " + ContainersLogsService.this.updateContainers.size()));
                        try {
                            ContainersLogsService.this.containersLogsDA.addAll(ContainersLogsService.this.updateContainers.values());
                        }
                        catch (StorageException ex) {
                            LOG.warn((Object)"Unable to update containers logs table", (Throwable)ex);
                        }
                    }
                    if (updatetTick) {
                        ContainersLogsService.this.variableDA.setVariable((Variable)ContainersLogsService.this.tickCounter);
                    }
                    connector.commit();
                    return null;
                }
            };
            containersLogsHandler.handle();
            QuotaService quotaService = this.rMContext.getQuotaService();
            if (quotaService != null) {
                quotaService.insertEvents(this.updateContainers.values());
            }
            this.updateContainers.clear();
        }
        catch (IOException ex) {
            LOG.warn((Object)"Unable to update containers logs and tick counter", (Throwable)ex);
        }
    }

    private Map<String, ContainerLog> getContainersLogs() {
        Map<String, Object> allContainersLogs = new HashMap<String, ContainerLog>();
        try {
            LightWeightRequestHandler allContainersHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

                public Object performTask() throws StorageException {
                    connector.beginTransaction();
                    connector.readCommitted();
                    Map allContainersLogs = ContainersLogsService.this.containersLogsDA.getAll();
                    connector.commit();
                    return allContainersLogs;
                }
            };
            allContainersLogs = (Map)allContainersHandler.handle();
        }
        catch (IOException ex) {
            LOG.warn((Object)"Unable to retrieve containers logs table data", (Throwable)ex);
        }
        return allContainersLogs;
    }

    private LongVariable getTickCounter() {
        LongVariable tc = new LongVariable(Variable.Finder.QuotaTicksCounter, 0L);
        try {
            LightWeightRequestHandler tickCounterHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

                public Object performTask() throws StorageException {
                    connector.beginTransaction();
                    connector.readCommitted();
                    Variable tickCounterVariable = (Variable)ContainersLogsService.this.variableDA.getVariable((Object)Variable.Finder.QuotaTicksCounter);
                    connector.commit();
                    return tickCounterVariable;
                }
            };
            LongVariable found = (LongVariable)tickCounterHandler.handle();
            if (found != null && found.getValue() != null) {
                tc = found;
            }
        }
        catch (IOException ex) {
            LOG.warn((Object)"Unable to retrieve tick counter from YARN variables", (Throwable)ex);
        }
        return tc;
    }

    private Map<PriceMultiplicator.MultiplicatorType, PriceMultiplicator> getCurrentMultiplicator() throws IOException {
        LightWeightRequestHandler currentPriceHandler = new LightWeightRequestHandler((RequestHandler.OperationType)YARNOperationType.TEST){

            public Object performTask() throws StorageException {
                connector.beginTransaction();
                connector.readCommitted();
                PriceMultiplicatorDataAccess da = (PriceMultiplicatorDataAccess)RMStorageFactory.getDataAccess(PriceMultiplicatorDataAccess.class);
                Map currentPrices = da.getAll();
                connector.commit();
                return currentPrices;
            }
        };
        return (Map)currentPriceHandler.handle();
    }

    private synchronized void createCheckpoint() {
        long tick = this.tickCounter.getValue();
        for (ContainerLog log : this.activeContainers.values()) {
            if ((tick - log.getStart()) % (long)this.checkpointInterval != 0L) continue;
            log.setStop(this.tickCounter.getValue().longValue());
            if ((tick - log.getStart()) / (long)this.checkpointInterval % this.multiplicatorPeirod == 0L) {
                float currentMultiplicator = log.getGpuUsed() != 0 ? this.currentMultiplicators.get(PriceMultiplicator.MultiplicatorType.GPU).floatValue() : this.currentMultiplicators.get(PriceMultiplicator.MultiplicatorType.GENERAL).floatValue();
                log.setPrice(currentMultiplicator);
            }
            this.updateContainers.put(log.getContainerid(), log);
        }
    }

    public void processTick() {
        List<ContainerStatus> latestEvents = this.getLatestEvents();
        LOG.debug((Object)("CL :: Event count: " + latestEvents.size()));
        this.checkEventContainerStatuses(latestEvents);
        if (this.checkpointEnabled) {
            this.createCheckpoint();
        }
        LOG.debug((Object)("CL :: Update list size: " + this.updateContainers.size()));
        LOG.debug((Object)("CL :: Active list size: " + this.activeContainers.size()));
        this.updateContainersLogs(true);
    }

    public long getCurrentTick() {
        return this.tickCounter.getValue();
    }

    private class TickThread
    implements Runnable {
        private TickThread() {
        }

        @Override
        public void run() {
            try {
                while (!ContainersLogsService.this.stopped && !Thread.currentThread().isInterrupted()) {
                    long executionTime = 0L;
                    long startTime = System.currentTimeMillis();
                    if (ContainersLogsService.this.recovered) {
                        LOG.debug((Object)("CL :: Current tick: " + ContainersLogsService.this.tickCounter.getValue()));
                        ContainersLogsService.this.processTick();
                        ContainersLogsService.this.tickCounter = new LongVariable(Variable.Finder.QuotaTicksCounter, ContainersLogsService.this.tickCounter.getValue() + 1L);
                    } else {
                        LOG.debug((Object)"CL :: Not yet recovered");
                    }
                    executionTime = System.currentTimeMillis() - startTime;
                    if (ContainersLogsService.this.monitorInterval < executionTime) {
                        LOG.warn((Object)("Monitor interval threshold exceeded! Execution time: " + executionTime + "ms. Monitor interval: " + ContainersLogsService.this.monitorInterval + "ms. Consider increasing monitor interval!"));
                        executionTime = executionTime > ContainersLogsService.this.monitorInterval ? ContainersLogsService.this.monitorInterval : executionTime;
                    }
                    Thread.sleep(Math.max(0L, ContainersLogsService.this.monitorInterval - executionTime));
                }
            }
            catch (InterruptedException ex) {
                LOG.error((Object)ex, (Throwable)ex);
            }
        }
    }
}

