package org.apache.tez.dag.history.logging.ats;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.security.HistoryACLPolicyException;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.class */
public class ATSV15HistoryLoggingService extends HistoryLoggingService {

    @VisibleForTesting
    LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    private Thread eventHandlingThread;
    private AtomicBoolean stopped;
    private int eventCounter;
    private int eventsProcessed;
    private final Object lock;
    private boolean historyLoggingEnabled;

    @VisibleForTesting
    TimelineClient timelineClient;
    private HashSet<TezDAGID> skippedDAGs;
    private Map<TezDAGID, String> dagDomainIdMap;
    private long maxTimeToWaitOnShutdown;
    private boolean waitForeverOnShutdown;
    private long maxPollingTimeMillis;
    private String sessionDomainId;
    private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";

    @VisibleForTesting
    HistoryACLPolicyManager historyACLPolicyManager;
    private int numDagsPerGroup;
    private static final Logger LOG = LoggerFactory.getLogger(ATSV15HistoryLoggingService.class);
    private static final String atsHistoryLoggingServiceClassName = ATSV15HistoryLoggingService.class.getName();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$tez$dag$history$HistoryEventType = new int[HistoryEventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_SUBMITTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_INITIALIZED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_STARTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_FINISHED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_KILL_REQUEST.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_INITIALIZED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_STARTED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_CONFIGURE_DONE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_FINISHED.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.TASK_STARTED.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.TASK_FINISHED.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.TASK_ATTEMPT_STARTED.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.TASK_ATTEMPT_FINISHED.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_COMMIT_STARTED.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_COMMIT_STARTED.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_GROUP_COMMIT_STARTED.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.DAG_RECOVERED.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.APP_LAUNCHED.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.AM_LAUNCHED.ordinal()] = 20;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.AM_STARTED.ordinal()] = 21;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.CONTAINER_LAUNCHED.ordinal()] = 22;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$org$apache$tez$dag$history$HistoryEventType[HistoryEventType.CONTAINER_STOPPED.ordinal()] = 23;
            } catch (NoSuchFieldError e23) {
            }
        }
    }

    public ATSV15HistoryLoggingService() {
        super(ATSV15HistoryLoggingService.class.getName());
        this.eventQueue = new LinkedBlockingQueue<>();
        this.stopped = new AtomicBoolean(false);
        this.eventCounter = 0;
        this.eventsProcessed = 0;
        this.lock = new Object();
        this.historyLoggingEnabled = true;
        this.skippedDAGs = new HashSet<>();
        this.dagDomainIdMap = new HashMap();
        this.waitForeverOnShutdown = false;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        Configuration configuration2 = new Configuration(configuration);
        String str = EntityTypes.TEZ_APPLICATION + "," + EntityTypes.TEZ_APPLICATION_ATTEMPT + "," + EntityTypes.TEZ_DAG_ID;
        if (configuration2.getBoolean("tez.am.ats.v15.override.summary-types", true)) {
            configuration2.set("yarn.timeline-service.entity-group-fs-store.summary-entity-types", str);
        }
        this.historyLoggingEnabled = configuration2.getBoolean("tez.am.history.logging.enabled", true);
        if (!this.historyLoggingEnabled) {
            LOG.info("ATSService: History Logging disabled. tez.am.history.logging.enabled set to false");
            return;
        }
        if (configuration2.getBoolean("yarn.timeline-service.enabled", false)) {
            this.timelineClient = TimelineClient.createTimelineClient();
            this.timelineClient.init(configuration2);
        } else {
            this.timelineClient = null;
            if (configuration2.get("tez.history.logging.service.class", "").equals(atsHistoryLoggingServiceClassName)) {
                LOG.warn(atsHistoryLoggingServiceClassName + " is disabled due to Timeline Service being disabled, yarn.timeline-service.enabled set to false");
            }
        }
        this.maxTimeToWaitOnShutdown = configuration2.getLong("tez.yarn.ats.event.flush.timeout.millis", -1L);
        this.maxPollingTimeMillis = configuration2.getInt("tez.yarn.ats.max.polling.time.per.event.millis", 10);
        if (this.maxTimeToWaitOnShutdown < 0) {
            this.waitForeverOnShutdown = true;
        }
        LOG.info("Initializing " + ATSV15HistoryLoggingService.class.getSimpleName() + " with , maxPollingTime(ms)=" + this.maxPollingTimeMillis + ", waitTimeForShutdown(ms)=" + this.maxTimeToWaitOnShutdown + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName);
        try {
            this.historyACLPolicyManager = (HistoryACLPolicyManager) ReflectionUtils.createClazzInstance(atsHistoryACLManagerClassName);
            this.historyACLPolicyManager.setConf(configuration2);
        } catch (TezReflectionException e) {
            LOG.warn("Could not instantiate object for org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager. ACLs cannot be enforced correctly for history data in Timeline", e);
            if (!configuration2.getBoolean("tez.allow.disabled.timeline-domains", false)) {
                throw e;
            }
            this.historyACLPolicyManager = null;
        }
        this.numDagsPerGroup = configuration2.getInt("tez.history.logging.timeline.num-dags-per-group", 1);
    }

    public void serviceStart() {
        if (!this.historyLoggingEnabled || this.timelineClient == null) {
            return;
        }
        this.timelineClient.start();
        try {
            this.sessionDomainId = createSessionDomain();
            this.eventHandlingThread = new Thread(new Runnable() { // from class: org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService.1
                @Override // java.lang.Runnable
                public void run() {
                    DAGHistoryEvent poll;
                    boolean z = false;
                    TezUtilsInternal.setHadoopCallerContext(ATSV15HistoryLoggingService.this.appContext.getHadoopShim(), ATSV15HistoryLoggingService.this.appContext.getApplicationID());
                    while (!ATSV15HistoryLoggingService.this.stopped.get() && !Thread.currentThread().isInterrupted() && !z) {
                        if (ATSV15HistoryLoggingService.this.eventCounter == 0 || ATSV15HistoryLoggingService.this.eventCounter % 1000 != 0) {
                            ATSV15HistoryLoggingService.access$304(ATSV15HistoryLoggingService.this);
                        } else {
                            if (ATSV15HistoryLoggingService.this.eventsProcessed != 0 && !ATSV15HistoryLoggingService.this.eventQueue.isEmpty()) {
                                ATSV15HistoryLoggingService.LOG.info("Event queue stats, eventsProcessedSinceLastUpdate=" + ATSV15HistoryLoggingService.this.eventsProcessed + ", eventQueueSize=" + ATSV15HistoryLoggingService.this.eventQueue.size());
                            }
                            ATSV15HistoryLoggingService.this.eventCounter = 0;
                            ATSV15HistoryLoggingService.this.eventsProcessed = 0;
                        }
                        synchronized (ATSV15HistoryLoggingService.this.lock) {
                            try {
                                poll = ATSV15HistoryLoggingService.this.eventQueue.poll(ATSV15HistoryLoggingService.this.maxPollingTimeMillis, TimeUnit.MILLISECONDS);
                            } catch (InterruptedException e) {
                                z = true;
                            }
                            if (poll != null) {
                                if (ATSV15HistoryLoggingService.this.isValidEvent(poll)) {
                                    try {
                                        ATSV15HistoryLoggingService.this.handleEvents(poll);
                                        ATSV15HistoryLoggingService.this.eventsProcessed++;
                                    } catch (Exception e2) {
                                        ATSV15HistoryLoggingService.LOG.warn("Error handling events", e2);
                                    }
                                }
                            }
                        }
                    }
                }
            }, "HistoryEventHandlingThread");
            this.eventHandlingThread.start();
        } catch (HistoryACLPolicyException | IOException e) {
            LOG.warn("Could not setup history acls, disabling history logging.", e);
            this.historyLoggingEnabled = false;
        }
    }

    public void serviceStop() {
        DAGHistoryEvent poll;
        LOG.info("Stopping ATSService, eventQueueBacklog=" + this.eventQueue.size());
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        try {
            TezUtilsInternal.setHadoopCallerContext(this.appContext.getHadoopShim(), this.appContext.getApplicationID());
            synchronized (this.lock) {
                if (!this.eventQueue.isEmpty()) {
                    LOG.warn("ATSService being stopped, eventQueueBacklog=" + this.eventQueue.size() + ", maxTimeLeftToFlush=" + this.maxTimeToWaitOnShutdown + ", waitForever=" + this.waitForeverOnShutdown);
                    long time = this.appContext.getClock().getTime() + this.maxTimeToWaitOnShutdown;
                    while (true) {
                        if (!this.waitForeverOnShutdown && time < this.appContext.getClock().getTime()) {
                            break;
                        }
                        try {
                            poll = this.eventQueue.poll(this.maxPollingTimeMillis, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            LOG.info("ATSService interrupted while shutting down. Exiting. EventQueueBacklog=" + this.eventQueue.size());
                        }
                        if (poll == null) {
                            LOG.info("Event queue empty, stopping ATS Service");
                            break;
                        } else if (isValidEvent(poll)) {
                            try {
                                handleEvents(poll);
                            } catch (Exception e2) {
                                LOG.warn("Error handling event", e2);
                            }
                        }
                    }
                }
            }
            if (!this.eventQueue.isEmpty()) {
                LOG.warn("Did not finish flushing eventQueue before stopping ATSService, eventQueueBacklog=" + this.eventQueue.size());
            }
            if (this.timelineClient != null) {
                this.timelineClient.stop();
            }
            if (this.historyACLPolicyManager != null) {
                this.historyACLPolicyManager.close();
            }
        } finally {
            this.appContext.getHadoopShim().clearHadoopCallerContext();
        }
    }

    @VisibleForTesting
    public TimelineEntityGroupId getGroupId(DAGHistoryEvent dAGHistoryEvent) {
        switch (AnonymousClass2.$SwitchMap$org$apache$tez$dag$history$HistoryEventType[dAGHistoryEvent.getHistoryEvent().getEventType().ordinal()]) {
            case 1:
            case 2:
            case 3:
            case 4:
            case 5:
            case 6:
            case 7:
            case 8:
            case 9:
            case 10:
            case 11:
            case 12:
            case 13:
            case 14:
            case 15:
            case 16:
            case 17:
            case 18:
                return TimelineEntityGroupId.newInstance(dAGHistoryEvent.getDagID().getApplicationId(), this.numDagsPerGroup > 1 ? dAGHistoryEvent.getDagID().getGroupId(this.numDagsPerGroup) : dAGHistoryEvent.getDagID().toString());
            case 19:
            case 20:
            case 21:
            case 22:
            case 23:
                return TimelineEntityGroupId.newInstance(this.appContext.getApplicationID(), this.appContext.getApplicationID().toString());
            default:
                return null;
        }
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        if (!this.historyLoggingEnabled || this.timelineClient == null) {
            return;
        }
        this.eventQueue.add(dAGHistoryEvent);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isValidEvent(DAGHistoryEvent dAGHistoryEvent) {
        DAGSubmittedEvent historyEvent;
        String dAGName;
        HistoryEventType eventType = dAGHistoryEvent.getHistoryEvent().getEventType();
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (eventType.equals(HistoryEventType.DAG_SUBMITTED) && (((dAGName = (historyEvent = dAGHistoryEvent.getHistoryEvent()).getDAGName()) != null && dAGName.startsWith("TezPreWarmDAG")) || !historyEvent.isHistoryLoggingEnabled())) {
            this.skippedDAGs.add(dagID);
            return false;
        }
        if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
            DAGRecoveredEvent historyEvent2 = dAGHistoryEvent.getHistoryEvent();
            if (!historyEvent2.isHistoryLoggingEnabled()) {
                this.skippedDAGs.add(historyEvent2.getDagID());
                return false;
            }
        }
        if (eventType.equals(HistoryEventType.DAG_FINISHED) && this.skippedDAGs.remove(dagID)) {
            return false;
        }
        return dagID == null || !this.skippedDAGs.contains(dagID);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEvents(DAGHistoryEvent dAGHistoryEvent) {
        String domainForEvent = getDomainForEvent(dAGHistoryEvent);
        if (dAGHistoryEvent.getDagID() == null || !this.skippedDAGs.contains(dAGHistoryEvent.getDagID())) {
            TimelineEntityGroupId groupId = getGroupId(dAGHistoryEvent);
            Iterator it = HistoryEventTimelineConversion.convertToTimelineEntities(dAGHistoryEvent.getHistoryEvent()).iterator();
            while (it.hasNext()) {
                logEntity(groupId, (TimelineEntity) it.next(), domainForEvent);
            }
        }
    }

    private void logEntity(TimelineEntityGroupId timelineEntityGroupId, TimelineEntity timelineEntity, String str) {
        if (this.historyACLPolicyManager != null && str != null && !str.isEmpty()) {
            this.historyACLPolicyManager.updateTimelineEntityDomain(timelineEntity, str);
        }
        try {
            TimelinePutResponse putEntities = this.timelineClient.putEntities(this.appContext.getApplicationAttemptId(), timelineEntityGroupId, new TimelineEntity[]{timelineEntity});
            if (putEntities != null && !putEntities.getErrors().isEmpty()) {
                int size = putEntities.getErrors().size();
                for (int i = 0; i < size; i++) {
                    TimelinePutResponse.TimelinePutError timelinePutError = (TimelinePutResponse.TimelinePutError) putEntities.getErrors().get(i);
                    if (timelinePutError.getErrorCode() != 0) {
                        LOG.warn("Could not post history event to ATS, atsPutError=" + timelinePutError.getErrorCode() + ", entityId=" + timelinePutError.getEntityId());
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Could not handle history events", e);
        }
    }

    private String getDomainForEvent(DAGHistoryEvent dAGHistoryEvent) {
        Configuration conf;
        DAGProtos.DAGPlan jobPlan;
        String str = this.sessionDomainId;
        if (this.historyACLPolicyManager == null) {
            return str;
        }
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        DAGSubmittedEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
        if (dagID == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
            return str;
        }
        if (this.dagDomainIdMap.containsKey(dagID)) {
            str = this.dagDomainIdMap.get(dagID);
            if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
                this.dagDomainIdMap.remove(dagID);
            }
        } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType() || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
            if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
                conf = historyEvent.getConf();
                jobPlan = historyEvent.getDAGPlan();
            } else {
                conf = this.appContext.getCurrentDAG().getConf();
                jobPlan = this.appContext.getCurrentDAG().getJobPlan();
            }
            str = createDagDomain(conf, jobPlan, dagID);
            if (this.skippedDAGs.contains(dagID)) {
                return null;
            }
            this.dagDomainIdMap.put(dagID, str);
        }
        return str;
    }

    private String createSessionDomain() throws IOException, HistoryACLPolicyException {
        Map map;
        if (this.historyACLPolicyManager == null || (map = this.historyACLPolicyManager.setupSessionACLs(getConfig(), this.appContext.getApplicationID())) == null) {
            return null;
        }
        return (String) map.get("tez.yarn.ats.acl.session.domain.id");
    }

    private String createDagDomain(Configuration configuration, DAGProtos.DAGPlan dAGPlan, TezDAGID tezDAGID) {
        if (!this.appContext.isSession()) {
            return this.sessionDomainId;
        }
        try {
            Map map = this.historyACLPolicyManager.setupSessionDAGACLs(configuration, this.appContext.getApplicationID(), Integer.toString(tezDAGID.getId()), dAGPlan.hasAclInfo() ? DagTypeConverters.convertDAGAccessControlsFromProto(dAGPlan.getAclInfo()) : null);
            return map != null ? (String) map.get("tez.yarn.ats.acl.dag.domain.id") : this.sessionDomainId;
        } catch (IOException | HistoryACLPolicyException e) {
            LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e);
            this.skippedDAGs.add(tezDAGID);
            return null;
        }
    }

    static /* synthetic */ int access$304(ATSV15HistoryLoggingService aTSV15HistoryLoggingService) {
        int i = aTSV15HistoryLoggingService.eventCounter + 1;
        aTSV15HistoryLoggingService.eventCounter = i;
        return i;
    }
}
