/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import io.hops.util.DBUtility;
import io.hops.util.RMStorageFactory;
import io.hops.util.YarnAPIStorageFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestIncreaseAllocationExpirer {
    private final int GB = 1024;
    private YarnConfiguration conf;
    RMNodeLabelsManager mgr;

    @Before
    public void setUp() throws Exception {
        this.conf = new YarnConfiguration();
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        RMStorageFactory.setConfiguration((Configuration)this.conf);
        YarnAPIStorageFactory.setConfiguration((Configuration)this.conf);
        DBUtility.InitializeDB();
        this.mgr = new NullRMNodeLabelsManager();
        this.mgr.init((Configuration)this.conf);
    }

    @Test
    public void testContainerIsRemovedFromAllocationExpirer() throws Exception {
        this.conf.setLong("yarn.resourcemanager.rm.container-allocation.expiry-interval-ms", 5000L);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20480);
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "default");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 1L, ContainerState.RUNNING);
        am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
        ContainerId containerId2 = ContainerId.newContainerId((ApplicationAttemptId)am1.getApplicationAttemptId(), (long)2L);
        rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
        List containers = am1.allocate(null, null).getAllocatedContainers();
        Assert.assertEquals((Object)containerId2, (Object)((Container)containers.get(0)).getId());
        Assert.assertNotNull((Object)((Container)containers.get(0)).getContainerToken());
        this.checkUsedResource(rm1, "default", 2048, null);
        FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(rm1, app1.getApplicationId());
        Assert.assertEquals((long)2048L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18432);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 2L, ContainerState.RUNNING);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        am1.sendContainerResizingRequest(Collections.singletonList(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)3072))));
        nm1.nodeHeartbeat(true);
        Thread.sleep(1000L);
        am1.allocate(null, null);
        Resource resource = Resources.clone((Resource)rm1.getResourceScheduler().getRMContainer(containerId2).getAllocatedResource());
        nm1.containerIncreaseStatus(this.getContainer(rm1, containerId2, resource));
        Thread.sleep(10000L);
        Assert.assertEquals((Object)RMContainerState.RUNNING, (Object)rm1.getResourceScheduler().getRMContainer(containerId2).getState());
        Assert.assertEquals((long)3072L, (long)rm1.getResourceScheduler().getRMContainer(containerId2).getAllocatedResource().getMemorySize());
        this.checkUsedResource(rm1, "default", 4096, null);
        Assert.assertEquals((long)4096L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16384);
        rm1.stop();
    }

    @Test
    public void testContainerIncreaseAllocationExpiration() throws Exception {
        this.conf.setLong("yarn.resourcemanager.rm.container-allocation.expiry-interval-ms", 5000L);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20480);
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "default");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 1L, ContainerState.RUNNING);
        am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
        ContainerId containerId2 = ContainerId.newContainerId((ApplicationAttemptId)am1.getApplicationAttemptId(), (long)2L);
        rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
        List containers = am1.allocate(null, null).getAllocatedContainers();
        Assert.assertEquals((Object)containerId2, (Object)((Container)containers.get(0)).getId());
        Assert.assertNotNull((Object)((Container)containers.get(0)).getContainerToken());
        this.checkUsedResource(rm1, "default", 2048, null);
        FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(rm1, app1.getApplicationId());
        Assert.assertEquals((long)2048L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18432);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 2L, ContainerState.RUNNING);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        am1.sendContainerResizingRequest(Collections.singletonList(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)3072))));
        nm1.nodeHeartbeat(true);
        Thread.sleep(1000L);
        am1.allocate(null, null);
        this.checkUsedResource(rm1, "default", 4096, null);
        Assert.assertEquals((long)4096L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16384);
        Thread.sleep(10000L);
        Assert.assertEquals((long)1024L, (long)rm1.getResourceScheduler().getRMContainer(containerId2).getAllocatedResource().getMemorySize());
        this.checkUsedResource(rm1, "default", 2048, null);
        Assert.assertEquals((long)2048L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18432);
        rm1.stop();
    }

    @Test
    public void testConsecutiveContainerIncreaseAllocationExpiration() throws Exception {
        this.conf.setLong("yarn.resourcemanager.rm.container-allocation.expiry-interval-ms", 5000L);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20480);
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "default");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 1L, ContainerState.RUNNING);
        am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
        ContainerId containerId2 = ContainerId.newContainerId((ApplicationAttemptId)am1.getApplicationAttemptId(), (long)2L);
        rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
        am1.allocate(null, null).getAllocatedContainers();
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 2L, ContainerState.RUNNING);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        am1.sendContainerResizingRequest(Collections.singletonList(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)3072))));
        nm1.nodeHeartbeat(true);
        Thread.sleep(1000L);
        am1.allocate(null, null);
        Resource resource1 = Resources.clone((Resource)rm1.getResourceScheduler().getRMContainer(containerId2).getAllocatedResource());
        AllocateResponse response = am1.sendContainerResizingRequest(Collections.singletonList(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)5120))));
        List updateErrors = response.getUpdateErrors();
        Assert.assertEquals((long)1L, (long)updateErrors.size());
        Assert.assertEquals((Object)"INCORRECT_CONTAINER_VERSION_ERROR|0|1", (Object)((UpdateContainerError)updateErrors.get(0)).getReason());
        am1.sendContainerResizingRequest(Collections.singletonList(UpdateContainerRequest.newInstance((int)1, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)5120))));
        nm1.nodeHeartbeat(true);
        Thread.sleep(1000L);
        am1.allocate(null, null);
        this.checkUsedResource(rm1, "default", 6144, null);
        FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(rm1, app1.getApplicationId());
        Assert.assertEquals((long)6144L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 14336);
        nm1.containerIncreaseStatus(this.getContainer(rm1, containerId2, resource1));
        Thread.sleep(10000L);
        Assert.assertEquals((long)3072L, (long)rm1.getResourceScheduler().getRMContainer(containerId2).getAllocatedResource().getMemorySize());
        this.checkUsedResource(rm1, "default", 4096, null);
        Assert.assertEquals((long)4096L, (long)app.getAppAttemptResourceUsage().getUsed().getMemorySize());
        this.verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16384);
        List containersToDecrease = nm1.nodeHeartbeat(true).getContainersToDecrease();
        Assert.assertEquals((long)1L, (long)containersToDecrease.size());
        Assert.assertEquals((long)3072L, (long)((Container)containersToDecrease.get(0)).getResource().getMemorySize());
        rm1.stop();
    }

    @Test
    public void testDecreaseAfterIncreaseWithAllocationExpiration() throws Exception {
        this.conf.setLong("yarn.resourcemanager.rm.container-allocation.expiry-interval-ms", 5000L);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.start();
        MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20480);
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "default");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 1L, ContainerState.RUNNING);
        am1.allocate("127.0.0.1", 3072, 3, new ArrayList<ContainerId>());
        ContainerId containerId2 = ContainerId.newContainerId((ApplicationAttemptId)am1.getApplicationAttemptId(), (long)2L);
        rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
        ContainerId containerId3 = ContainerId.newContainerId((ApplicationAttemptId)am1.getApplicationAttemptId(), (long)3L);
        rm1.waitForState(nm1, containerId3, RMContainerState.ALLOCATED);
        ContainerId containerId4 = ContainerId.newContainerId((ApplicationAttemptId)am1.getApplicationAttemptId(), (long)4L);
        rm1.waitForState(nm1, containerId4, RMContainerState.ALLOCATED);
        List containers = am1.allocate(null, null).getAllocatedContainers();
        Assert.assertEquals((long)3L, (long)containers.size());
        Assert.assertNotNull((Object)((Container)containers.get(0)).getContainerToken());
        Assert.assertNotNull((Object)((Container)containers.get(1)).getContainerToken());
        Assert.assertNotNull((Object)((Container)containers.get(2)).getContainerToken());
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 2L, ContainerState.RUNNING);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 3L, ContainerState.RUNNING);
        nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), 4L, ContainerState.RUNNING);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
        rm1.waitForState(nm1, containerId4, RMContainerState.RUNNING);
        ArrayList<UpdateContainerRequest> increaseRequests = new ArrayList<UpdateContainerRequest>();
        increaseRequests.add(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)6144)));
        increaseRequests.add(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId3, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)6144)));
        increaseRequests.add(UpdateContainerRequest.newInstance((int)0, (ContainerId)containerId4, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)6144)));
        am1.sendContainerResizingRequest(increaseRequests);
        nm1.nodeHeartbeat(true);
        Thread.sleep(1000L);
        am1.allocate(null, null);
        ArrayList<UpdateContainerRequest> decreaseRequests = new ArrayList<UpdateContainerRequest>();
        decreaseRequests.add(UpdateContainerRequest.newInstance((int)1, (ContainerId)containerId2, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)2048)));
        decreaseRequests.add(UpdateContainerRequest.newInstance((int)1, (ContainerId)containerId3, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)4096)));
        decreaseRequests.add(UpdateContainerRequest.newInstance((int)1, (ContainerId)containerId4, (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)4096)));
        AllocateResponse response = am1.sendContainerResizingRequest(decreaseRequests);
        Assert.assertEquals((long)3L, (long)response.getUpdatedContainers().size());
        nm1.containerIncreaseStatus(this.getContainer(rm1, containerId4, Resources.createResource((int)6144)));
        Thread.sleep(10000L);
        Assert.assertEquals((long)2048L, (long)rm1.getResourceScheduler().getRMContainer(containerId2).getAllocatedResource().getMemorySize());
        Assert.assertEquals((long)3072L, (long)rm1.getResourceScheduler().getRMContainer(containerId3).getAllocatedResource().getMemorySize());
        Assert.assertEquals((long)4096L, (long)rm1.getResourceScheduler().getRMContainer(containerId4).getAllocatedResource().getMemorySize());
        List containersToDecrease = nm1.nodeHeartbeat(true).getContainersToDecrease();
        Assert.assertEquals((long)2L, (long)containersToDecrease.size());
        Collections.sort(containersToDecrease);
        Assert.assertEquals((long)3072L, (long)((Container)containersToDecrease.get(0)).getResource().getMemorySize());
        Assert.assertEquals((long)4096L, (long)((Container)containersToDecrease.get(1)).getResource().getMemorySize());
        rm1.stop();
    }

    private void checkUsedResource(MockRM rm, String queueName, int memory, String label) {
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        CSQueue queue = cs.getQueue(queueName);
        Assert.assertEquals((long)memory, (long)queue.getQueueResourceUsage().getUsed(label == null ? "" : label).getMemorySize());
    }

    private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId, int expectedMemory) {
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        FiCaSchedulerNode node = cs.getNode(nodeId);
        Assert.assertEquals((long)expectedMemory, (long)node.getAvailableResource().getMemorySize());
    }

    private Container getContainer(MockRM rm, ContainerId containerId, Resource resource) {
        RMContainer rmContainer = rm.getResourceScheduler().getRMContainer(containerId);
        return Container.newInstance((ContainerId)containerId, (NodeId)rmContainer.getAllocatedNode(), null, (Resource)resource, null, null);
    }
}

