/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.timelineservice.collector;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class TimelineCollector
extends CompositeService {
    private static final Logger LOG = LoggerFactory.getLogger(TimelineCollector.class);
    public static final String SEPARATOR = "_";
    private TimelineWriter writer;
    private ConcurrentMap<String, AggregationStatusTable> aggregationGroups = new ConcurrentHashMap<String, AggregationStatusTable>();
    private static Set<String> entityTypesSkipAggregation = new HashSet<String>();
    private volatile boolean readyToAggregate = false;
    private volatile boolean isStopped = false;

    public TimelineCollector(String name) {
        super(name);
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.isStopped = true;
        super.serviceStop();
    }

    boolean isStopped() {
        return this.isStopped;
    }

    protected void setWriter(TimelineWriter w) {
        this.writer = w;
    }

    protected Map<String, AggregationStatusTable> getAggregationGroups() {
        return this.aggregationGroups;
    }

    protected void setReadyToAggregate() {
        this.readyToAggregate = true;
    }

    protected boolean isReadyToAggregate() {
        return this.readyToAggregate;
    }

    protected Set<String> getEntityTypesSkipAggregation() {
        return entityTypesSkipAggregation;
    }

    public abstract TimelineCollectorContext getTimelineEntityContext();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException {
        TimelineWriteResponse response;
        if (LOG.isDebugEnabled()) {
            LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")");
        }
        TimelineWriter timelineWriter = this.writer;
        synchronized (timelineWriter) {
            response = this.writeTimelineEntities(entities, callerUgi);
            this.flushBufferedTimelineEntities();
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TimelineWriteResponse putDomain(TimelineDomain domain, UserGroupInformation callerUgi) throws IOException {
        TimelineWriteResponse response;
        if (LOG.isDebugEnabled()) {
            LOG.debug("putDomain(domain=" + domain + ", callerUgi=" + callerUgi + ")");
        }
        TimelineWriter timelineWriter = this.writer;
        synchronized (timelineWriter) {
            TimelineCollectorContext context = this.getTimelineEntityContext();
            response = this.writer.write(context, domain);
            this.flushBufferedTimelineEntities();
        }
        return response;
    }

    private TimelineWriteResponse writeTimelineEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException {
        TimelineCollector.updateAggregateStatus(entities, this.aggregationGroups, this.getEntityTypesSkipAggregation());
        TimelineCollectorContext context = this.getTimelineEntityContext();
        return this.writer.write(context, entities, callerUgi);
    }

    private void flushBufferedTimelineEntities() throws IOException {
        this.writer.flush();
    }

    public void putEntitiesAsync(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")");
        }
        this.writeTimelineEntities(entities, callerUgi);
    }

    public static TimelineEntity aggregateEntities(TimelineEntities entities, String resultEntityId, String resultEntityType, boolean needsGroupIdInResult) {
        ConcurrentHashMap<String, AggregationStatusTable> aggregationGroups = new ConcurrentHashMap<String, AggregationStatusTable>();
        TimelineCollector.updateAggregateStatus(entities, aggregationGroups, null);
        if (needsGroupIdInResult) {
            return TimelineCollector.aggregate(aggregationGroups, resultEntityId, resultEntityType);
        }
        return TimelineCollector.aggregateWithoutGroupId(aggregationGroups, resultEntityId, resultEntityType);
    }

    static void updateAggregateStatus(TimelineEntities entities, ConcurrentMap<String, AggregationStatusTable> aggregationGroups, Set<String> typesToSkip) {
        for (TimelineEntity e : entities.getEntities()) {
            if (typesToSkip != null && typesToSkip.contains(e.getType()) || e.getMetrics().isEmpty()) continue;
            AggregationStatusTable aggrTable = (AggregationStatusTable)aggregationGroups.get(e.getType());
            if (aggrTable == null) {
                AggregationStatusTable table = new AggregationStatusTable();
                aggrTable = aggregationGroups.putIfAbsent(e.getType(), table);
                if (aggrTable == null) {
                    aggrTable = table;
                }
            }
            aggrTable.update(e);
        }
    }

    static TimelineEntity aggregate(Map<String, AggregationStatusTable> aggregationGroups, String resultEntityId, String resultEntityType) {
        TimelineEntity result = new TimelineEntity();
        result.setId(resultEntityId);
        result.setType(resultEntityType);
        for (Map.Entry<String, AggregationStatusTable> entry : aggregationGroups.entrySet()) {
            entry.getValue().aggregateAllTo(result, entry.getKey());
        }
        return result;
    }

    static TimelineEntity aggregateWithoutGroupId(Map<String, AggregationStatusTable> aggregationGroups, String resultEntityId, String resultEntityType) {
        TimelineEntity result = new TimelineEntity();
        result.setId(resultEntityId);
        result.setType(resultEntityType);
        for (Map.Entry<String, AggregationStatusTable> entry : aggregationGroups.entrySet()) {
            entry.getValue().aggregateAllTo(result, "");
        }
        return result;
    }

    protected static class AggregationStatusTable {
        private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>> aggregateTable = new ConcurrentHashMap<TimelineMetric, Map<String, TimelineMetric>>();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void update(TimelineEntity incoming) {
            String entityId = incoming.getId();
            for (TimelineMetric m : incoming.getMetrics()) {
                HashMap<String, TimelineMetric> tempRow;
                if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) continue;
                HashMap<String, TimelineMetric> aggrRow = (HashMap<String, TimelineMetric>)this.aggregateTable.get(m);
                if (aggrRow == null && (aggrRow = (Map)this.aggregateTable.putIfAbsent(m, tempRow = new HashMap<String, TimelineMetric>())) == null) {
                    aggrRow = tempRow;
                }
                HashMap<String, TimelineMetric> hashMap = aggrRow;
                synchronized (hashMap) {
                    aggrRow.put(entityId, m);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e, String aggregationGroupId) {
            if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
                return e;
            }
            Map aggrRow = (Map)this.aggregateTable.get(metric);
            if (aggrRow != null) {
                TimelineMetric aggrMetric = new TimelineMetric();
                if (aggregationGroupId.length() > 0) {
                    aggrMetric.setId(metric.getId() + TimelineCollector.SEPARATOR + aggregationGroupId);
                } else {
                    aggrMetric.setId(metric.getId());
                }
                aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
                HashMap status = new HashMap();
                Map map = aggrRow;
                synchronized (map) {
                    for (TimelineMetric m : aggrRow.values()) {
                        TimelineMetric.aggregateTo((TimelineMetric)m, (TimelineMetric)aggrMetric, status);
                        if (m.getRealtimeAggregationOp() == aggrMetric.getRealtimeAggregationOp()) continue;
                        aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
                    }
                    aggrRow.clear();
                }
                Set metrics = e.getMetrics();
                metrics.remove(aggrMetric);
                metrics.add(aggrMetric);
            }
            return e;
        }

        public TimelineEntity aggregateAllTo(TimelineEntity e, String aggregationGroupId) {
            for (TimelineMetric m : this.aggregateTable.keySet()) {
                this.aggregateTo(m, e, aggregationGroupId);
            }
            return e;
        }
    }
}

