/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.ats.acls;

import com.google.common.collect.Sets;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.tests.MiniTezClusterWithTimeline;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestATSHistoryWithACLs {
    private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryWithACLs.class);
    protected static MiniTezClusterWithTimeline mrrTezCluster = null;
    protected static MiniDFSCluster dfsCluster = null;
    private static String timelineAddress;
    private Random random = new Random();
    private static Configuration conf;
    private static FileSystem remoteFs;
    private static String TEST_ROOT_DIR;
    private static String user;
    private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";

    @BeforeClass
    public static void setup() throws IOException {
        try {
            conf.setBoolean("dfs.namenode.quota.enabled", false);
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        if (mrrTezCluster == null) {
            try {
                mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithACLs.class.getName(), 1, 1, 1, true);
                Configuration conf = new Configuration();
                conf.setBoolean("yarn.timeline-service.enabled", true);
                conf.set("fs.defaultFS", remoteFs.getUri().toString());
                conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
                mrrTezCluster.init(conf);
                mrrTezCluster.start();
            }
            catch (Throwable e) {
                LOG.info("Failed to start Mini Tez Cluster", e);
            }
        }
        user = UserGroupInformation.getCurrentUser().getShortUserName();
        timelineAddress = mrrTezCluster.getConfig().get("yarn.timeline-service.webapp.address");
        if (timelineAddress != null) {
            timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
        }
    }

    @AfterClass
    public static void tearDown() throws InterruptedException {
        LOG.info("Shutdown invoked");
        Thread.sleep(10000L);
        if (mrrTezCluster != null) {
            mrrTezCluster.stop();
            mrrTezCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }

    private <K> K getTimelineData(String url, Class<K> clazz) {
        Client client = new Client();
        WebResource resource = client.resource(url);
        ClientResponse response = (ClientResponse)resource.accept(new String[]{"application/json"}).get(ClientResponse.class);
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals((Object)MediaType.APPLICATION_JSON_TYPE, (Object)response.getType());
        Object entity = response.getEntity(clazz);
        Assert.assertNotNull((Object)entity);
        return (K)entity;
    }

    private TimelineDomain getDomain(String domainId) {
        Assert.assertNotNull((Object)timelineAddress);
        String url = "http://" + timelineAddress + "/ws/v1/timeline/domain/" + domainId;
        LOG.info("Getting timeline domain: " + url);
        TimelineDomain domain = this.getTimelineData(url, TimelineDomain.class);
        Assert.assertNotNull((Object)domain);
        Assert.assertNotNull((Object)domain.getOwner());
        Assert.assertNotNull((Object)domain.getReaders());
        Assert.assertNotNull((Object)domain.getWriters());
        LOG.info("TimelineDomain for id " + domainId + ", owner=" + domain.getOwner() + ", readers=" + domain.getReaders() + ", writers=" + domain.getWriters());
        return domain;
    }

    private void verifyDomainACLs(TimelineDomain timelineDomain, Collection<String> users, Collection<String> groups) {
        String readers = timelineDomain.getReaders();
        int pos = readers.indexOf(" ");
        String readerUsers = readers.substring(0, pos);
        String readerGroups = readers.substring(pos + 1);
        Assert.assertTrue((boolean)readerUsers.contains(user));
        for (String s : users) {
            Assert.assertTrue((boolean)readerUsers.contains(s));
        }
        for (String s : groups) {
            Assert.assertTrue((boolean)readerGroups.contains(s));
        }
        if (!user.equals("nobody1") && !users.contains("nobody1")) {
            Assert.assertFalse((boolean)readerUsers.contains("nobody1"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=50000L)
    public void testSimpleAMACls() throws Exception {
        ApplicationId applicationId;
        TezClient tezSession = null;
        String viewAcls = "nobody nobody_group";
        try {
            SleepProcessor.SleepProcessorConfig spConf = new SleepProcessor.SleepProcessorConfig(1);
            DAG dag = DAG.create((String)"TezSleepProcessor");
            Vertex vertex = Vertex.create((String)"SleepVertex", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload())), (int)1, (Resource)Resource.newInstance((int)256, (int)1));
            dag.addVertex(vertex);
            TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
            tezConf.set("tez.am.view-acls", viewAcls);
            tezConf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
            Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
            remoteFs.mkdirs(remoteStagingDir);
            tezConf.set("tez.staging-dir", remoteStagingDir.toString());
            tezSession = TezClient.create((String)"TezSleepProcessor", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            applicationId = tezSession.getAppMasterApplicationId();
            DAGClient dagClient = tezSession.submitDAG(dag);
            DAGStatus dagStatus = dagClient.getDAGStatus(null);
            while (!dagStatus.isCompleted()) {
                LOG.info("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState());
                Thread.sleep(500L);
                dagStatus = dagClient.getDAGStatus(null);
            }
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
        TimelineDomain timelineDomain = this.getDomain("Tez_ATS_" + applicationId.toString());
        this.verifyDomainACLs(timelineDomain, Collections.singleton("nobody"), Collections.singleton("nobody_group"));
        this.verifyEntityDomains(applicationId, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=50000L)
    public void testDAGACls() throws Exception {
        ApplicationId applicationId;
        TezClient tezSession = null;
        String viewAcls = "nobody nobody_group";
        try {
            SleepProcessor.SleepProcessorConfig spConf = new SleepProcessor.SleepProcessorConfig(1);
            DAG dag = DAG.create((String)"TezSleepProcessor");
            Vertex vertex = Vertex.create((String)"SleepVertex", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload())), (int)1, (Resource)Resource.newInstance((int)256, (int)1));
            dag.addVertex(vertex);
            DAGAccessControls accessControls = new DAGAccessControls();
            accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
            accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
            dag.setAccessControls(accessControls);
            TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
            tezConf.set("tez.am.view-acls", viewAcls);
            tezConf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
            Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
            remoteFs.mkdirs(remoteStagingDir);
            tezConf.set("tez.staging-dir", remoteStagingDir.toString());
            tezSession = TezClient.create((String)"TezSleepProcessor", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            applicationId = tezSession.getAppMasterApplicationId();
            DAGClient dagClient = tezSession.submitDAG(dag);
            DAGStatus dagStatus = dagClient.getDAGStatus(null);
            while (!dagStatus.isCompleted()) {
                LOG.info("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState());
                Thread.sleep(500L);
                dagStatus = dagClient.getDAGStatus(null);
            }
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
        TimelineDomain timelineDomain = this.getDomain("Tez_ATS_" + applicationId.toString());
        this.verifyDomainACLs(timelineDomain, Collections.singleton("nobody"), Collections.singleton("nobody_group"));
        timelineDomain = this.getDomain("Tez_ATS_" + applicationId.toString() + "_1");
        this.verifyDomainACLs(timelineDomain, Sets.newHashSet((Object[])new String[]{"nobody", "nobody2"}), Sets.newHashSet((Object[])new String[]{"nobody_group", "nobody_group2"}));
        this.verifyEntityDomains(applicationId, false);
    }

    @Test(timeout=50000L)
    public void testDisableSessionLogging() throws Exception {
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
    }

    @Test(timeout=50000L)
    public void testDagLoggingDisabled() throws Exception {
        ATSHistoryLoggingService historyLoggingService = (ATSHistoryLoggingService)ReflectionUtils.createClazzInstance((String)ATSHistoryLoggingService.class.getName());
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        Mockito.when((Object)appContext.getApplicationID()).thenReturn((Object)ApplicationId.newInstance((long)0L, (int)1));
        historyLoggingService.setAppContext(appContext);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        String viewAcls = "nobody nobody_group";
        tezConf.set("tez.am.view-acls", viewAcls);
        tezConf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        historyLoggingService.init((Configuration)tezConf);
        historyLoggingService.start();
        ApplicationId appId = ApplicationId.newInstance((long)100L, (int)1);
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)appId, (int)100);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)1);
        DAGProtos.DAGPlan dagPlan = DAGProtos.DAGPlan.newBuilder().setName("DAGPlanMock").build();
        DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, 1L, dagPlan, appAttemptId, null, "usr", (Configuration)tezConf, null, null);
        submittedEvent.setHistoryLoggingEnabled(false);
        DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, (HistoryEvent)submittedEvent);
        historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, (HistoryEvent)submittedEvent));
        Thread.sleep(1000L);
        String url = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/" + event.getDagID();
        Client client = new Client();
        WebResource resource = client.resource(url);
        ClientResponse response = (ClientResponse)resource.accept(new String[]{"application/json"}).get(ClientResponse.class);
        Assert.assertEquals((long)404L, (long)response.getStatus());
    }

    @Test(timeout=50000L)
    public void testDagLoggingEnabled() throws Exception {
        ATSHistoryLoggingService historyLoggingService = (ATSHistoryLoggingService)ReflectionUtils.createClazzInstance((String)ATSHistoryLoggingService.class.getName());
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        Mockito.when((Object)appContext.getApplicationID()).thenReturn((Object)ApplicationId.newInstance((long)0L, (int)1));
        historyLoggingService.setAppContext(appContext);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        String viewAcls = "nobody nobody_group";
        tezConf.set("tez.am.view-acls", viewAcls);
        tezConf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        historyLoggingService.init((Configuration)tezConf);
        historyLoggingService.start();
        ApplicationId appId = ApplicationId.newInstance((long)100L, (int)1);
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)appId, (int)11);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)1);
        DAGProtos.DAGPlan dagPlan = DAGProtos.DAGPlan.newBuilder().setName("DAGPlanMock").build();
        DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, 1L, dagPlan, appAttemptId, null, "usr", (Configuration)tezConf, null, null);
        submittedEvent.setHistoryLoggingEnabled(true);
        DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, (HistoryEvent)submittedEvent);
        historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, (HistoryEvent)submittedEvent));
        Thread.sleep(1000L);
        String url = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/" + event.getDagID();
        Client client = new Client();
        WebResource resource = client.resource(url);
        ClientResponse response = (ClientResponse)resource.accept(new String[]{"application/json"}).get(ClientResponse.class);
        Assert.assertEquals((long)200L, (long)response.getStatus());
        Assert.assertEquals((Object)MediaType.APPLICATION_JSON_TYPE, (Object)response.getType());
        TimelineEntity entity = (TimelineEntity)response.getEntity(TimelineEntity.class);
        Assert.assertEquals((Object)entity.getEntityType(), (Object)"TEZ_DAG_ID");
        Assert.assertEquals((Object)((TimelineEvent)entity.getEvents().get(0)).getEventType(), (Object)HistoryEventType.DAG_SUBMITTED.toString());
    }

    @Test(timeout=50000L)
    public void testTimelineServiceDisabled() throws Exception {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        tezConf.setBoolean("yarn.timeline-service.enabled", false);
        ATSHistoryACLPolicyManager historyACLPolicyManager = (ATSHistoryACLPolicyManager)ReflectionUtils.createClazzInstance((String)atsHistoryACLManagerClassName);
        historyACLPolicyManager.setConf((Configuration)tezConf);
        Assert.assertNull((Object)historyACLPolicyManager.timelineClient);
    }

    private void verifyEntityDomains(ApplicationId applicationId, boolean sameDomain) {
        Assert.assertNotNull((Object)timelineAddress);
        String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/tez_" + applicationId.toString();
        LOG.info("Getting timeline entity for tez application: " + appUrl);
        TimelineEntity appEntity = this.getTimelineData(appUrl, TimelineEntity.class);
        TezDAGID tezDAGID = TezDAGID.getInstance((ApplicationId)applicationId, (int)1);
        String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/" + tezDAGID.toString();
        LOG.info("Getting timeline entity for tez dag: " + dagUrl);
        TimelineEntity dagEntity = this.getTimelineData(dagUrl, TimelineEntity.class);
        Assert.assertEquals((Object)("Tez_ATS_" + applicationId.toString()), (Object)appEntity.getDomainId());
        if (!sameDomain) {
            Assert.assertEquals((Object)("Tez_ATS_" + applicationId.toString() + "_1"), (Object)dagEntity.getDomainId());
        } else {
            Assert.assertEquals((Object)appEntity.getDomainId(), (Object)dagEntity.getDomainId());
        }
    }

    static {
        conf = new Configuration();
        TEST_ROOT_DIR = "target/" + TestATSHistoryWithACLs.class.getName() + "-tmpDir";
    }
}

