/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.ahs;

import io.hops.util.DBUtility;
import io.hops.util.RMStorageFactory;
import io.hops.util.YarnAPIStorageFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingApplicationAttemptFinishEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingApplicationAttemptStartEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingApplicationFinishEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingApplicationHistoryEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingApplicationStartEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingContainerFinishEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingContainerStartEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.WritingHistoryEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class TestRMApplicationHistoryWriter {
    private static int MAX_RETRIES = 10;
    private RMApplicationHistoryWriter writer;
    private ApplicationHistoryStore store;
    private List<CounterDispatcher> dispatchers = new ArrayList<CounterDispatcher>();

    @Before
    public void setup() throws IOException {
        this.store = new MemoryApplicationHistoryStore();
        Configuration conf = new Configuration();
        conf.setBoolean("yarn.timeline-service.generic-application-history.enabled", true);
        conf.setClass("yarn.timeline-service.generic-application-history.store-class", MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
        RMStorageFactory.setConfiguration((Configuration)conf);
        YarnAPIStorageFactory.setConfiguration((Configuration)conf);
        DBUtility.InitializeDB();
        this.writer = new RMApplicationHistoryWriter(){

            protected ApplicationHistoryStore createApplicationHistoryStore(Configuration conf) {
                return TestRMApplicationHistoryWriter.this.store;
            }

            protected Dispatcher createDispatcher(Configuration conf) {
                MultiThreadedDispatcher dispatcher = new MultiThreadedDispatcher(conf.getInt("yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size", 10));
                dispatcher.setDrainEventsOnStop();
                return dispatcher;
            }

            class MultiThreadedDispatcher
            extends RMApplicationHistoryWriter.MultiThreadedDispatcher {
                public MultiThreadedDispatcher(int num) {
                    super(num);
                }

                protected AsyncDispatcher createDispatcher() {
                    CounterDispatcher dispatcher = new CounterDispatcher();
                    TestRMApplicationHistoryWriter.this.dispatchers.add(dispatcher);
                    return dispatcher;
                }
            }
        };
        this.writer.init(conf);
        this.writer.start();
    }

    @After
    public void tearDown() {
        this.writer.stop();
    }

    private static RMApp createRMApp(ApplicationId appId) {
        RMApp app = (RMApp)Mockito.mock(RMApp.class);
        Mockito.when((Object)app.getApplicationId()).thenReturn((Object)appId);
        Mockito.when((Object)app.getName()).thenReturn((Object)"test app");
        Mockito.when((Object)app.getApplicationType()).thenReturn((Object)"test app type");
        Mockito.when((Object)app.getUser()).thenReturn((Object)"test user");
        Mockito.when((Object)app.getQueue()).thenReturn((Object)"test queue");
        Mockito.when((Object)app.getSubmitTime()).thenReturn((Object)0L);
        Mockito.when((Object)app.getStartTime()).thenReturn((Object)1L);
        Mockito.when((Object)app.getFinishTime()).thenReturn((Object)2L);
        Mockito.when((Object)app.getDiagnostics()).thenReturn((Object)new StringBuilder("test diagnostics info"));
        Mockito.when((Object)app.getFinalApplicationStatus()).thenReturn((Object)FinalApplicationStatus.UNDEFINED);
        return app;
    }

    private static RMAppAttempt createRMAppAttempt(ApplicationAttemptId appAttemptId) {
        RMAppAttempt appAttempt = (RMAppAttempt)Mockito.mock(RMAppAttempt.class);
        Mockito.when((Object)appAttempt.getAppAttemptId()).thenReturn((Object)appAttemptId);
        Mockito.when((Object)appAttempt.getHost()).thenReturn((Object)"test host");
        Mockito.when((Object)appAttempt.getRpcPort()).thenReturn((Object)-100);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)1L));
        Mockito.when((Object)appAttempt.getMasterContainer()).thenReturn((Object)container);
        Mockito.when((Object)appAttempt.getDiagnostics()).thenReturn((Object)"test diagnostics info");
        Mockito.when((Object)appAttempt.getTrackingUrl()).thenReturn((Object)"test url");
        Mockito.when((Object)appAttempt.getFinalApplicationStatus()).thenReturn((Object)FinalApplicationStatus.UNDEFINED);
        return appAttempt;
    }

    private static RMContainer createRMContainer(ContainerId containerId) {
        RMContainer container = (RMContainer)Mockito.mock(RMContainer.class);
        Mockito.when((Object)container.getContainerId()).thenReturn((Object)containerId);
        Mockito.when((Object)container.getAllocatedNode()).thenReturn((Object)NodeId.newInstance((String)"test host", (int)-100));
        Mockito.when((Object)container.getAllocatedResource()).thenReturn((Object)Resource.newInstance((int)-1, (int)-1, (int)-1));
        Mockito.when((Object)container.getAllocatedPriority()).thenReturn((Object)Priority.UNDEFINED);
        Mockito.when((Object)container.getCreationTime()).thenReturn((Object)0L);
        Mockito.when((Object)container.getFinishTime()).thenReturn((Object)1L);
        Mockito.when((Object)container.getDiagnosticsInfo()).thenReturn((Object)"test diagnostics info");
        Mockito.when((Object)container.getLogURL()).thenReturn((Object)"test log url");
        Mockito.when((Object)container.getContainerExitStatus()).thenReturn((Object)-1);
        Mockito.when((Object)container.getContainerState()).thenReturn((Object)ContainerState.COMPLETE);
        return container;
    }

    @Test
    public void testDefaultStoreSetup() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setBoolean("yarn.timeline-service.generic-application-history.enabled", true);
        RMApplicationHistoryWriter writer = new RMApplicationHistoryWriter();
        writer.init((Configuration)conf);
        writer.start();
        try {
            Assert.assertFalse((boolean)writer.historyServiceEnabled);
            Assert.assertNull((Object)writer.writer);
        }
        finally {
            writer.stop();
            writer.close();
        }
    }

    @Test
    public void testWriteApplication() throws Exception {
        int i;
        RMApp app = TestRMApplicationHistoryWriter.createRMApp(ApplicationId.newInstance((long)0L, (int)1));
        this.writer.applicationStarted(app);
        ApplicationHistoryData appHD = null;
        for (i = 0; i < MAX_RETRIES && (appHD = this.store.getApplication(ApplicationId.newInstance((long)0L, (int)1))) == null; ++i) {
            Thread.sleep(100L);
        }
        Assert.assertNotNull(appHD);
        Assert.assertEquals((Object)"test app", (Object)appHD.getApplicationName());
        Assert.assertEquals((Object)"test app type", (Object)appHD.getApplicationType());
        Assert.assertEquals((Object)"test user", (Object)appHD.getUser());
        Assert.assertEquals((Object)"test queue", (Object)appHD.getQueue());
        Assert.assertEquals((long)0L, (long)appHD.getSubmitTime());
        Assert.assertEquals((long)1L, (long)appHD.getStartTime());
        this.writer.applicationFinished(app, RMAppState.FINISHED);
        for (i = 0; i < MAX_RETRIES && (appHD = this.store.getApplication(ApplicationId.newInstance((long)0L, (int)1))).getYarnApplicationState() == null; ++i) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((long)2L, (long)appHD.getFinishTime());
        Assert.assertEquals((Object)"test diagnostics info", (Object)appHD.getDiagnosticsInfo());
        Assert.assertEquals((Object)FinalApplicationStatus.UNDEFINED, (Object)appHD.getFinalApplicationStatus());
        Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appHD.getYarnApplicationState());
    }

    @Test
    public void testWriteApplicationAttempt() throws Exception {
        int i;
        RMAppAttempt appAttempt = TestRMApplicationHistoryWriter.createRMAppAttempt(ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1));
        this.writer.applicationAttemptStarted(appAttempt);
        ApplicationAttemptHistoryData appAttemptHD = null;
        for (i = 0; i < MAX_RETRIES && (appAttemptHD = this.store.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1))) == null; ++i) {
            Thread.sleep(100L);
        }
        Assert.assertNotNull(appAttemptHD);
        Assert.assertEquals((Object)"test host", (Object)appAttemptHD.getHost());
        Assert.assertEquals((long)-100L, (long)appAttemptHD.getRPCPort());
        Assert.assertEquals((Object)ContainerId.newContainerId((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1), (long)1L), (Object)appAttemptHD.getMasterContainerId());
        this.writer.applicationAttemptFinished(appAttempt, RMAppAttemptState.FINISHED);
        for (i = 0; i < MAX_RETRIES && (appAttemptHD = this.store.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1))).getYarnApplicationAttemptState() == null; ++i) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)"test diagnostics info", (Object)appAttemptHD.getDiagnosticsInfo());
        Assert.assertEquals((Object)"test url", (Object)appAttemptHD.getTrackingURL());
        Assert.assertEquals((Object)FinalApplicationStatus.UNDEFINED, (Object)appAttemptHD.getFinalApplicationStatus());
        Assert.assertEquals((Object)YarnApplicationAttemptState.FINISHED, (Object)appAttemptHD.getYarnApplicationAttemptState());
    }

    @Test
    public void testWriteContainer() throws Exception {
        int i;
        RMContainer container = TestRMApplicationHistoryWriter.createRMContainer(ContainerId.newContainerId((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1), (long)1L));
        this.writer.containerStarted(container);
        ContainerHistoryData containerHD = null;
        for (i = 0; i < MAX_RETRIES && (containerHD = this.store.getContainer(ContainerId.newContainerId((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1), (long)1L))) == null; ++i) {
            Thread.sleep(100L);
        }
        Assert.assertNotNull(containerHD);
        Assert.assertEquals((Object)NodeId.newInstance((String)"test host", (int)-100), (Object)containerHD.getAssignedNode());
        Assert.assertEquals((Object)Resource.newInstance((int)-1, (int)-1, (int)-1), (Object)containerHD.getAllocatedResource());
        Assert.assertEquals((Object)Priority.UNDEFINED, (Object)containerHD.getPriority());
        Assert.assertEquals((long)0L, (long)container.getCreationTime());
        this.writer.containerFinished(container);
        for (i = 0; i < MAX_RETRIES && (containerHD = this.store.getContainer(ContainerId.newContainerId((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1), (long)1L))).getContainerState() == null; ++i) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)"test diagnostics info", (Object)containerHD.getDiagnosticsInfo());
        Assert.assertEquals((long)-1L, (long)containerHD.getContainerExitStatus());
        Assert.assertEquals((Object)ContainerState.COMPLETE, (Object)containerHD.getContainerState());
    }

    @Test
    public void testParallelWrite() throws Exception {
        int i;
        ArrayList<ApplicationId> appIds = new ArrayList<ApplicationId>();
        for (i = 0; i < 10; ++i) {
            Random rand = new Random(i);
            ApplicationId appId = ApplicationId.newInstance((long)0L, (int)rand.nextInt());
            appIds.add(appId);
            RMApp app = TestRMApplicationHistoryWriter.createRMApp(appId);
            this.writer.applicationStarted(app);
            for (int j = 1; j <= 10; ++j) {
                ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)j);
                RMAppAttempt appAttempt = TestRMApplicationHistoryWriter.createRMAppAttempt(appAttemptId);
                this.writer.applicationAttemptStarted(appAttempt);
                for (int k = 1; k <= 10; ++k) {
                    ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)k);
                    RMContainer container = TestRMApplicationHistoryWriter.createRMContainer(containerId);
                    this.writer.containerStarted(container);
                    this.writer.containerFinished(container);
                }
                this.writer.applicationAttemptFinished(appAttempt, RMAppAttemptState.FINISHED);
            }
            this.writer.applicationFinished(app, RMAppState.FINISHED);
        }
        for (i = 0; i < MAX_RETRIES && !this.allEventsHandled(2220); ++i) {
            Thread.sleep(500L);
        }
        Assert.assertTrue((boolean)this.allEventsHandled(2220));
        for (ApplicationId appId : appIds) {
            Assert.assertTrue((boolean)this.handledByOne(appId));
        }
    }

    private boolean allEventsHandled(int expected) {
        int actual = 0;
        for (CounterDispatcher dispatcher : this.dispatchers) {
            for (Integer count : dispatcher.counts.values()) {
                actual += count.intValue();
            }
        }
        return actual == expected;
    }

    @Test
    public void testRMWritingMassiveHistoryForFairSche() throws Exception {
        this.testRMWritingMassiveHistory(true);
    }

    @Test
    public void testRMWritingMassiveHistoryForCapacitySche() throws Exception {
        this.testRMWritingMassiveHistory(false);
    }

    private void testRMWritingMassiveHistory(boolean isFS) throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        if (isFS) {
            conf.setBoolean("yarn.scheduler.fair.assignmultiple", true);
            conf.set("yarn.resourcemanager.scheduler.class", FairScheduler.class.getName());
        } else {
            conf.set("yarn.resourcemanager.scheduler.class", CapacityScheduler.class.getName());
        }
        MockRM rm = new MockRM((Configuration)conf){

            protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
                return new RMApplicationHistoryWriter(){

                    public void applicationStarted(RMApp app) {
                    }

                    public void applicationFinished(RMApp app, RMAppState finalState) {
                    }

                    public void applicationAttemptStarted(RMAppAttempt appAttempt) {
                    }

                    public void applicationAttemptFinished(RMAppAttempt appAttempt, RMAppAttemptState finalState) {
                    }

                    public void containerStarted(RMContainer container) {
                    }

                    public void containerFinished(RMContainer container) {
                    }
                };
            }
        };
        long startTime1 = System.currentTimeMillis();
        this.testRMWritingMassiveHistory(rm);
        long finishTime1 = System.currentTimeMillis();
        long elapsedTime1 = finishTime1 - startTime1;
        rm = new MockRM((Configuration)conf);
        long startTime2 = System.currentTimeMillis();
        this.testRMWritingMassiveHistory(rm);
        long finishTime2 = System.currentTimeMillis();
        long elapsedTime2 = finishTime2 - startTime2;
        Assert.assertTrue((elapsedTime2 - elapsedTime1 < elapsedTime1 / 10L ? 1 : 0) != 0);
    }

    private void testRMWritingMassiveHistory(MockRM rm) throws Exception {
        int cleanedSize;
        int allocatedSize;
        rm.start();
        MockNM nm = rm.registerNode("127.0.0.1:1234", 0x9DD000);
        RMApp app = rm.submitApp(1024);
        nm.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 10000;
        am.allocate("127.0.0.1", 1024, request, new ArrayList<ContainerId>());
        nm.nodeHeartbeat(true);
        List allocated = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
        int waitCount = 0;
        for (allocatedSize = allocated.size(); allocatedSize < request && waitCount++ < 200; allocatedSize += allocated.size()) {
            Thread.sleep(300L);
            allocated = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
            nm.nodeHeartbeat(true);
        }
        Assert.assertEquals((long)request, (long)allocatedSize);
        am.unregisterAppAttempt();
        am.waitForState(RMAppAttemptState.FINISHING);
        nm.nodeHeartbeat(am.getApplicationAttemptId(), 1L, ContainerState.COMPLETE);
        am.waitForState(RMAppAttemptState.FINISHED);
        NodeHeartbeatResponse resp = nm.nodeHeartbeat(true);
        List cleaned = resp.getContainersToCleanup();
        waitCount = 0;
        for (cleanedSize = cleaned.size(); cleanedSize < allocatedSize && waitCount++ < 200; cleanedSize += cleaned.size()) {
            Thread.sleep(300L);
            resp = nm.nodeHeartbeat(true);
            cleaned = resp.getContainersToCleanup();
        }
        Assert.assertEquals((long)allocatedSize, (long)cleanedSize);
        rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
        rm.stop();
    }

    private boolean handledByOne(ApplicationId appId) {
        int count = 0;
        for (CounterDispatcher dispatcher : this.dispatchers) {
            if (!dispatcher.counts.containsKey(appId)) continue;
            ++count;
        }
        return count == 1;
    }

    private static class CounterDispatcher
    extends AsyncDispatcher {
        private Map<ApplicationId, Integer> counts = new HashMap<ApplicationId, Integer>();

        private CounterDispatcher() {
        }

        protected void dispatch(Event event) {
            if (event instanceof WritingApplicationHistoryEvent) {
                WritingApplicationHistoryEvent ashEvent = (WritingApplicationHistoryEvent)event;
                switch ((WritingHistoryEventType)ashEvent.getType()) {
                    case APP_START: {
                        this.incrementCounts(((WritingApplicationStartEvent)event).getApplicationId());
                        break;
                    }
                    case APP_FINISH: {
                        this.incrementCounts(((WritingApplicationFinishEvent)event).getApplicationId());
                        break;
                    }
                    case APP_ATTEMPT_START: {
                        this.incrementCounts(((WritingApplicationAttemptStartEvent)event).getApplicationAttemptId().getApplicationId());
                        break;
                    }
                    case APP_ATTEMPT_FINISH: {
                        this.incrementCounts(((WritingApplicationAttemptFinishEvent)event).getApplicationAttemptId().getApplicationId());
                        break;
                    }
                    case CONTAINER_START: {
                        this.incrementCounts(((WritingContainerStartEvent)event).getContainerId().getApplicationAttemptId().getApplicationId());
                        break;
                    }
                    case CONTAINER_FINISH: {
                        this.incrementCounts(((WritingContainerFinishEvent)event).getContainerId().getApplicationAttemptId().getApplicationId());
                    }
                }
            }
            super.dispatch(event);
        }

        private void incrementCounts(ApplicationId appId) {
            Integer val = this.counts.get(appId);
            if (val == null) {
                this.counts.put(appId, 1);
            } else {
                this.counts.put(appId, val + 1);
            }
        }
    }
}

