/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdatedCryptoForApp;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestApplicationCleanup {
    private static final Log LOG = LogFactory.getLog(TestApplicationCleanup.class);
    private YarnConfiguration conf;

    @Before
    public void setup() throws UnknownHostException {
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        this.conf = new YarnConfiguration();
        UserGroupInformation.setConfiguration((Configuration)this.conf);
        this.conf.set("yarn.resourcemanager.recovery.enabled", "true");
        this.conf.set("yarn.resourcemanager.store.class", MemoryRMStateStore.class.getName());
        Assert.assertTrue((boolean)true);
    }

    @Test
    public void testAppCleanup() throws Exception {
        int contReceived;
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        MockRM rm = new MockRM();
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
        RMApp app = rm.submitApp(2000);
        nm1.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 2;
        am.allocate("127.0.0.1", 1000, request, new ArrayList<ContainerId>());
        nm1.nodeHeartbeat(true);
        List conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
        int waitCount = 0;
        for (contReceived = conts.size(); contReceived < request && waitCount++ < 200; contReceived += conts.size()) {
            LOG.info((Object)("Got " + contReceived + " containers. Waiting to get " + request));
            Thread.sleep(100L);
            conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
            nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((long)request, (long)contReceived);
        am.unregisterAppAttempt();
        NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1L, ContainerState.COMPLETE);
        rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
        resp = nm1.nodeHeartbeat(true);
        List containersToCleanup = resp.getContainersToCleanup();
        List appsToCleanup = resp.getApplicationsToCleanup();
        int numCleanedContainers = containersToCleanup.size();
        int numCleanedApps = appsToCleanup.size();
        waitCount = 0;
        while ((numCleanedContainers < 2 || numCleanedApps < 1) && waitCount++ < 200) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + numCleanedContainers + " cleanedApps: " + numCleanedApps));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            List deltaContainersToCleanup = resp.getContainersToCleanup();
            List deltaAppsToCleanup = resp.getApplicationsToCleanup();
            containersToCleanup.addAll(deltaContainersToCleanup);
            appsToCleanup.addAll(deltaAppsToCleanup);
            numCleanedContainers = containersToCleanup.size();
            numCleanedApps = appsToCleanup.size();
        }
        Assert.assertEquals((long)1L, (long)appsToCleanup.size());
        Assert.assertEquals((Object)app.getApplicationId(), appsToCleanup.get(0));
        Assert.assertEquals((long)1L, (long)numCleanedApps);
        Assert.assertEquals((long)2L, (long)numCleanedContainers);
        rm.stop();
    }

    @Test
    public void testContainerCleanup() throws Exception {
        int contReceived;
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        MockRM rm = new MockRM();
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
        RMApp app = rm.submitApp(2000);
        nm1.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 2;
        am.allocate("127.0.0.1", 1000, request, new ArrayList<ContainerId>());
        rm.drainEvents();
        nm1.nodeHeartbeat(true);
        List conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
        int waitCount = 0;
        for (contReceived = conts.size(); contReceived < request && waitCount++ < 200; contReceived += conts.size()) {
            LOG.info((Object)("Got " + contReceived + " containers. Waiting to get " + request));
            Thread.sleep(100L);
            conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
            rm.drainEvents();
            nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((long)request, (long)contReceived);
        ArrayList<ContainerId> release = new ArrayList<ContainerId>();
        release.add(((Container)conts.get(0)).getId());
        am.allocate(new ArrayList<ResourceRequest>(), release);
        rm.drainEvents();
        HashMap<ApplicationId, List<ContainerStatus>> containerStatuses = new HashMap<ApplicationId, List<ContainerStatus>>();
        ArrayList<ContainerStatus> containerStatusList = new ArrayList<ContainerStatus>();
        containerStatusList.add(BuilderUtils.newContainerStatus((ContainerId)((Container)conts.get(0)).getId(), (ContainerState)ContainerState.RUNNING, (String)"nothing", (int)0, (Resource)((Container)conts.get(0)).getResource()));
        containerStatuses.put(app.getApplicationId(), containerStatusList);
        NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
        this.waitForContainerCleanup(rm, nm1, resp);
        LOG.info((Object)"Testing container launch much after release and NM getting cleanup");
        containerStatuses.clear();
        containerStatusList.clear();
        containerStatusList.add(BuilderUtils.newContainerStatus((ContainerId)((Container)conts.get(0)).getId(), (ContainerState)ContainerState.RUNNING, (String)"nothing", (int)0, (Resource)((Container)conts.get(0)).getResource()));
        containerStatuses.put(app.getApplicationId(), containerStatusList);
        resp = nm1.nodeHeartbeat(containerStatuses, true);
        this.waitForContainerCleanup(rm, nm1, resp);
        rm.stop();
    }

    protected void waitForContainerCleanup(MockRM rm, MockNM nm, NodeHeartbeatResponse resp) throws Exception {
        List contsToClean;
        int waitCount = 0;
        int cleanedConts = 0;
        do {
            rm.drainEvents();
            contsToClean = resp.getContainersToCleanup();
            if ((cleanedConts += contsToClean.size()) >= 1) break;
            Thread.sleep(100L);
            resp = nm.nodeHeartbeat(true);
        } while (waitCount++ < 200);
        if (contsToClean.isEmpty()) {
            LOG.error((Object)"Failed to get any containers to cleanup");
        } else {
            LOG.info((Object)("Got cleanup for " + contsToClean.get(0)));
        }
        Assert.assertEquals((long)1L, (long)cleanedConts);
    }

    private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) throws Exception {
        NodeHeartbeatResponse response;
        while ((response = nm.nodeHeartbeat(true)).getApplicationsToCleanup() == null || response.getApplicationsToCleanup().size() != 1 || !appId.equals(response.getApplicationsToCleanup().get(0))) {
            LOG.info((Object)("Haven't got application=" + appId.toString() + " in cleanup list from node heartbeat response, sleep for a while before next heartbeat"));
            Thread.sleep(1000L);
        }
        return;
    }

    private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception {
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        nm.nodeHeartbeat(true);
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
        return am;
    }

    @Test(timeout=60000L)
    public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockMemoryRMStateStore memStore = (MockMemoryRMStateStore)rm1.getRMStateStore();
        MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
        nm1.registerNode();
        RMApp app0 = rm1.submitApp(200);
        MockAM am0 = this.launchAM(app0, rm1, nm1);
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1L, ContainerState.COMPLETE);
        rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        MockRM rm2 = new MockRM((Configuration)this.conf, (RMStateStore)memStore);
        rm2.start();
        nm1.setResourceTrackerService(rm2.getResourceTrackerService());
        nm1.registerNode(this.createRunningAppsForRequest(0, 0L, app0.getApplicationId()));
        rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        this.waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
        rm1.stop();
        rm2.stop();
    }

    @Test(timeout=60000L)
    public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService());
        nm1.registerNode();
        MockNM nm2 = new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService());
        nm2.registerNode();
        RMApp app0 = rm1.submitApp(200);
        MockAM am0 = this.launchAM(app0, rm1, nm1);
        AllocateResponse allocResponse = am0.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resource.newInstance((int)1024, (int)0), (int)1)), null);
        while (null == allocResponse.getAllocatedContainers() || allocResponse.getAllocatedContainers().isEmpty()) {
            nm2.nodeHeartbeat(true);
            allocResponse = am0.allocate(null, null);
            Thread.sleep(1000L);
        }
        MockRM rm2 = new MockRM((Configuration)this.conf, rm1.getRMStateStore());
        rm2.start();
        nm1.setResourceTrackerService(rm2.getResourceTrackerService());
        nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance((ContainerId)ContainerId.newContainerId((ApplicationAttemptId)am0.getApplicationAttemptId(), (long)1L), (int)0, (ContainerState)ContainerState.COMPLETE, (Resource)Resource.newInstance((int)1024, (int)1), (String)"", (int)0, (Priority)Priority.newInstance((int)0), (long)1234L)), this.createRunningAppsForRequest(0, 0L, app0.getApplicationId()));
        nm2.setResourceTrackerService(rm2.getResourceTrackerService());
        nm2.registerNode(this.createRunningAppsForRequest(0, 0L, app0.getApplicationId()));
        rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        this.waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
        this.waitForAppCleanupMessageRecved(nm2, app0.getApplicationId());
        rm1.stop();
        rm2.stop();
    }

    @Test(timeout=60000L)
    public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
        nm1.registerNode();
        RMApp app0 = rm1.submitApp(200);
        MockAM am0 = this.launchAM(app0, rm1, nm1);
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1L, ContainerState.RUNNING);
        rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
        MockRM rm2 = new MockRM((Configuration)this.conf, rm1.getRMStateStore());
        rm2.start();
        nm1.setResourceTrackerService(rm2.getResourceTrackerService());
        nm1.registerNode(this.createRunningAppsForRequest(0, 0L, app0.getApplicationId()));
        rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
        NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 2L, ContainerState.RUNNING);
        this.waitForContainerCleanup(rm2, nm1, response);
        rm1.stop();
        rm2.stop();
    }

    @Test(timeout=60000L)
    public void testAppCleanupWhenNMReconnects() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
        nm1.registerNode();
        RMApp app0 = rm1.submitApp(200);
        MockAM am0 = this.launchAM(app0, rm1, nm1);
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1L, ContainerState.COMPLETE);
        rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        this.waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
        nm1.registerNode(this.createRunningAppsForRequest(0, 0L, app0.getApplicationId()));
        this.waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
        rm1.stop();
    }

    @Test(timeout=60000L)
    public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        int nmMemory = 8192;
        int amMemory = 1024;
        int containerMemory = 2048;
        MockNM nm1 = new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService());
        nm1.registerNode();
        RMApp app0 = rm1.submitApp(amMemory);
        MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
        int noOfContainers = 1;
        List<Container> allocateContainers = am0.allocateAndWaitForContainers(noOfContainers, containerMemory, nm1);
        Assert.assertEquals((long)noOfContainers, (long)allocateContainers.size());
        Container container = allocateContainers.get(0);
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1L, ContainerState.RUNNING);
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), container.getId().getContainerId(), ContainerState.RUNNING);
        rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
        ResourceScheduler rs = rm1.getRMContext().getScheduler();
        long allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
        Assert.assertEquals((long)(amMemory + containerMemory), (long)allocatedMB);
        List<NMContainerStatus> nMContainerStatusForApp = TestApplicationCleanup.createNMContainerStatusForApp(am0);
        nm1.registerNode(nMContainerStatusForApp, this.createRunningAppsForRequest(0, 0L, app0.getApplicationId()));
        this.waitForClusterMemory(nm1, rs, amMemory);
        Assert.assertEquals((long)amMemory, (long)rs.getRootQueueMetrics().getAllocatedMB());
        AllocateRequest req = AllocateRequest.newInstance((int)0, (float)0.0f, new ArrayList(), new ArrayList(), null);
        AllocateResponse allocate = am0.allocate(req);
        List completedContainersStatuses = allocate.getCompletedContainersStatuses();
        Assert.assertEquals((long)noOfContainers, (long)completedContainersStatuses.size());
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1L, ContainerState.COMPLETE);
        this.waitForClusterMemory(nm1, rs, 0);
        rm1.stop();
    }

    private void waitForClusterMemory(MockNM nm1, ResourceScheduler rs, int clusterMemory) throws Exception, InterruptedException {
        int counter = 0;
        while (rs.getRootQueueMetrics().getAllocatedMB() != (long)clusterMemory) {
            nm1.nodeHeartbeat(true);
            Thread.sleep(100L);
            if (counter++ != 50) continue;
            Assert.fail((String)("Wait for cluster memory is timed out.Expected=" + clusterMemory + " Actual=" + rs.getRootQueueMetrics().getAllocatedMB()));
        }
    }

    public static List<NMContainerStatus> createNMContainerStatusForApp(MockAM am) {
        ArrayList<NMContainerStatus> list = new ArrayList<NMContainerStatus>();
        NMContainerStatus amContainer = TestApplicationCleanup.createNMContainerStatus(am.getApplicationAttemptId(), 1, ContainerState.RUNNING, 1024);
        NMContainerStatus completedContainer = TestApplicationCleanup.createNMContainerStatus(am.getApplicationAttemptId(), 2, ContainerState.COMPLETE, 2048);
        list.add(amContainer);
        list.add(completedContainer);
        return list;
    }

    public static NMContainerStatus createNMContainerStatus(ApplicationAttemptId appAttemptId, int id, ContainerState containerState, int memory) {
        ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)id);
        NMContainerStatus containerReport = NMContainerStatus.newInstance((ContainerId)containerId, (int)0, (ContainerState)containerState, (Resource)Resource.newInstance((int)memory, (int)1), (String)"recover container", (int)0, (Priority)Priority.newInstance((int)0), (long)0L);
        return containerReport;
    }

    private Map<ApplicationId, UpdatedCryptoForApp> createRunningAppsForRequest(Integer cryptoVersion, long jwtExpiration, ApplicationId ... appIds) {
        HashMap<ApplicationId, UpdatedCryptoForApp> runningApps = new HashMap<ApplicationId, UpdatedCryptoForApp>();
        for (ApplicationId appId : appIds) {
            runningApps.put(appId, UpdatedCryptoForApp.newInstance((int)cryptoVersion, (long)jwtExpiration));
        }
        return runningApps;
    }

    public static void main(String[] args) throws Exception {
        TestApplicationCleanup t = new TestApplicationCleanup();
        t.testAppCleanup();
    }
}

