/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.logging.ats;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestATSHistoryLoggingService {
    private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryLoggingService.class);
    private ATSHistoryLoggingService atsHistoryLoggingService;
    private AppContext appContext;
    private Configuration conf;
    private int atsInvokeCounter;
    private int atsEntitiesCounter;
    private HistoryACLPolicyManager historyACLPolicyManager;
    private SystemClock clock = new SystemClock();
    private static ApplicationId appId = ApplicationId.newInstance((long)1000L, (int)1);
    private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)1);

    @Before
    public void setup() throws Exception {
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        this.historyACLPolicyManager = (HistoryACLPolicyManager)Mockito.mock(HistoryACLPolicyManager.class);
        this.atsHistoryLoggingService = new ATSHistoryLoggingService();
        this.atsHistoryLoggingService.setAppContext(this.appContext);
        this.conf = new Configuration(false);
        this.conf.setLong("tez.yarn.ats.event.flush.timeout.millis", 1000L);
        this.conf.setInt("tez.yarn.ats.max.events.per.batch", 2);
        this.conf.setBoolean("tez.allow.disabled.timeline-domains", true);
        this.conf.set("tez.yarn.ats.acl.session.domain.id", "test-domain");
        this.atsInvokeCounter = 0;
        this.atsEntitiesCounter = 0;
        this.atsHistoryLoggingService.init(this.conf);
        this.atsHistoryLoggingService.historyACLPolicyManager = this.historyACLPolicyManager;
        this.atsHistoryLoggingService.timelineClient = (TimelineClient)Mockito.mock(TimelineClient.class);
        Mockito.when((Object)this.appContext.getClock()).thenReturn((Object)this.clock);
        Mockito.when((Object)this.appContext.getCurrentDAGID()).thenReturn(null);
        Mockito.when((Object)this.appContext.getApplicationID()).thenReturn((Object)appId);
        Mockito.when((Object)this.atsHistoryLoggingService.timelineClient.putEntities((TimelineEntity[])Matchers.anyVararg())).thenAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                ++TestATSHistoryLoggingService.this.atsInvokeCounter;
                TestATSHistoryLoggingService.this.atsEntitiesCounter = TestATSHistoryLoggingService.this.atsEntitiesCounter + invocation.getArguments().length;
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return null;
            }
        });
    }

    @After
    public void teardown() {
        this.atsHistoryLoggingService.stop();
        this.atsHistoryLoggingService = null;
    }

    @Test(timeout=20000L)
    public void testATSHistoryLoggingServiceShutdown() {
        this.atsHistoryLoggingService.start();
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, (HistoryEvent)new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; ++i) {
            this.atsHistoryLoggingService.handle(historyEvent);
        }
        try {
            Thread.sleep(2500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.atsHistoryLoggingService.stop();
        LOG.info("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter);
        Assert.assertTrue((this.atsEntitiesCounter >= 4 ? 1 : 0) != 0);
        Assert.assertTrue((this.atsEntitiesCounter < 20 ? 1 : 0) != 0);
    }

    @Test(timeout=20000L)
    public void testATSEventBatching() {
        this.atsHistoryLoggingService.start();
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, (HistoryEvent)new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; ++i) {
            this.atsHistoryLoggingService.handle(historyEvent);
        }
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter);
        Assert.assertTrue((this.atsEntitiesCounter > this.atsInvokeCounter ? 1 : 0) != 0);
        Assert.assertEquals((long)(this.atsEntitiesCounter / 2), (long)this.atsInvokeCounter);
    }

    @Test(timeout=20000L)
    public void testTimelineServiceDisable() throws Exception {
        this.atsHistoryLoggingService.start();
        ATSHistoryLoggingService atsHistoryLoggingService1 = new ATSHistoryLoggingService();
        atsHistoryLoggingService1.setAppContext(this.appContext);
        atsHistoryLoggingService1.timelineClient = (TimelineClient)Mockito.mock(TimelineClient.class);
        Mockito.when((Object)atsHistoryLoggingService1.timelineClient.putEntities((TimelineEntity[])Matchers.anyVararg())).thenAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                ++TestATSHistoryLoggingService.this.atsInvokeCounter;
                TestATSHistoryLoggingService.this.atsEntitiesCounter = TestATSHistoryLoggingService.this.atsEntitiesCounter + invocation.getArguments().length;
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return null;
            }
        });
        this.conf.setBoolean("yarn.timeline-service.enabled", false);
        this.conf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        atsHistoryLoggingService1.init(this.conf);
        atsHistoryLoggingService1.start();
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, (HistoryEvent)new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; ++i) {
            atsHistoryLoggingService1.handle(historyEvent);
        }
        try {
            Thread.sleep(20L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter);
        Assert.assertEquals((long)this.atsInvokeCounter, (long)0L);
        Assert.assertEquals((long)this.atsEntitiesCounter, (long)0L);
        Assert.assertNull((Object)atsHistoryLoggingService1.timelineClient);
        atsHistoryLoggingService1.close();
    }

    @Test(timeout=10000L)
    public void testNonSessionDomains() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.session.domain.id", "session-id"));
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)6))).updateTimelineEntityDomain(Mockito.any(), (String)Matchers.eq((Object)"session-id"));
    }

    @Test(timeout=10000L)
    public void testNonSessionDomainsFailed() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenThrow(new Throwable[]{new IOException()});
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(1000L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).updateTimelineEntityDomain(Mockito.any(), (String)Matchers.eq((Object)"session-id"));
        Assert.assertEquals((long)0L, (long)this.atsEntitiesCounter);
    }

    @Test(timeout=10000L)
    public void testNonSessionDomainsAclNull() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenReturn(null);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).updateTimelineEntityDomain(Mockito.any(), (String)Matchers.eq((Object)"session-id"));
        Assert.assertEquals((long)6L, (long)this.atsEntitiesCounter);
    }

    @Test(timeout=10000L)
    public void testSessionDomains() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.session.domain.id", "test-domain"));
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any(), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.dag.domain.id", "dag-domain"));
        Mockito.when((Object)this.appContext.isSession()).thenReturn((Object)true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).updateTimelineEntityDomain(Mockito.any(), (String)Matchers.eq((Object)"test-domain"));
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)5))).updateTimelineEntityDomain(Mockito.any(), (String)Matchers.eq((Object)"dag-domain"));
    }

    @Test(timeout=10000L)
    public void testSessionDomainsFailed() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenThrow(new Throwable[]{new IOException()});
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any(), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.dag.domain.id", "dag-domain"));
        Mockito.when((Object)this.appContext.isSession()).thenReturn((Object)true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(1000L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).updateTimelineEntityDomain(Mockito.any(), (String)Mockito.any());
        Assert.assertEquals((long)0L, (long)this.atsEntitiesCounter);
    }

    @Test(timeout=10000L)
    public void testSessionDomainsDagFailed() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.session.domain.id", "session-domain"));
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any(), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any())).thenThrow(new Throwable[]{new IOException()});
        Mockito.when((Object)this.appContext.isSession()).thenReturn((Object)true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).updateTimelineEntityDomain(Mockito.any(), (String)Matchers.eq((Object)"session-domain"));
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).updateTimelineEntityDomain(Mockito.any(), (String)Mockito.any());
        Assert.assertEquals((long)1L, (long)this.atsEntitiesCounter);
    }

    @Test(timeout=10000L)
    public void testSessionDomainsAclNull() throws Exception {
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any())).thenReturn(null);
        Mockito.when((Object)this.historyACLPolicyManager.setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any(), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any())).thenReturn(null);
        Mockito.when((Object)this.appContext.isSession()).thenReturn((Object)true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionACLs((Configuration)Mockito.any(), (ApplicationId)Mockito.any());
        TezDAGID dagId1 = TezDAGID.getInstance((ApplicationId)appId, (int)0);
        for (DAGHistoryEvent event : this.makeHistoryEvents(dagId1, this.atsHistoryLoggingService)) {
            this.atsHistoryLoggingService.handle(event);
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)1))).setupSessionDAGACLs((Configuration)Mockito.any(), (ApplicationId)Matchers.eq((Object)appId), (String)Matchers.eq((Object)"0"), (DAGAccessControls)Mockito.any());
        ((HistoryACLPolicyManager)Mockito.verify((Object)this.historyACLPolicyManager, (VerificationMode)Mockito.times((int)0))).updateTimelineEntityDomain(Mockito.any(), (String)Mockito.any());
        Assert.assertEquals((long)6L, (long)this.atsEntitiesCounter);
    }

    private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, ATSHistoryLoggingService service) {
        ArrayList<DAGHistoryEvent> historyEvents = new ArrayList<DAGHistoryEvent>();
        long time = System.currentTimeMillis();
        Configuration conf = new Configuration(service.getConfig());
        historyEvents.add(new DAGHistoryEvent(null, (HistoryEvent)new AMStartedEvent(attemptId, time, "user")));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGSubmittedEvent(dagId, time, DAGProtos.DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null, "default")));
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagId, (int)1);
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new VertexStartedEvent(vertexID, time, time)));
        TezTaskID tezTaskID = TezTaskID.getInstance((TezVertexID)vertexID, (int)1);
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskStartedEvent(tezTaskID, "test", time, time)));
        historyEvents.add(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance((TezTaskID)tezTaskID, (int)1), "test", time, ContainerId.newContainerId((ApplicationAttemptId)attemptId, (long)1L), NodeId.newInstance((String)"localhost", (int)8765), null, null, null)));
        return historyEvents;
    }
}

