/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerPreemptionTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestCapacitySchedulerSurgicalPreemption
extends CapacitySchedulerPreemptionTestBase {
    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.conf.setBoolean("yarn.resourcemanager.monitor.capacity.preemption.select_based_on_reserved_containers", true);
    }

    @Test(timeout=120000L)
    public void testSimpleSurgicalPreemption() throws Exception {
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 20480);
        MockNM nm2 = rm1.registerNode("h2:1234", 20480);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 32, new ArrayList<ContainerId>());
        for (int i = 0; i < 32; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)33L, (long)schedulerApp1.getLiveContainers().size());
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 17);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 16);
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
        Assert.assertEquals((long)2048L, (long)cs.getNode(nm1.getNodeId()).getAvailableResource().getMemorySize());
        Assert.assertEquals((long)4096L, (long)cs.getNode(nm2.getNodeId()).getAvailableResource().getMemorySize());
        am2.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)6144), (int)1)), null);
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        Assert.assertNotNull((Object)cs.getNode(nm1.getNodeId()).getReservedContainer());
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 29);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 13);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 16);
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testSurgicalPreemptionWithAvailableResource() throws Exception {
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 20480);
        MockNM nm2 = rm1.registerNode("h2:1234", 20480);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 38, new ArrayList<ContainerId>());
        for (int i = 0; i < 38; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)39L, (long)schedulerApp1.getLiveContainers().size());
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 20);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 19);
        RMApp app2 = rm1.submitApp(4096, "app", "user", null, "c");
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app2.getApplicationId(), (int)1));
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        Assert.assertEquals((long)3L, (long)editPolicy.getToPreemptContainers().size());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 36);
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        this.waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        for (int tick = 0; schedulerApp2.getLiveContainers().size() != 1 && tick < 10; ++tick) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
            Thread.sleep(100L);
        }
        this.waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
        rm1.close();
    }
}

