/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.OpportunisticContainerAllocatorAMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestOpportunisticContainerAllocatorAMService {
    private static final int GB = 1024;
    private MockRM rm;
    private DrainDispatcher dispatcher;
    private OpportunisticContainersStatus oppContainersStatus = this.getOpportunisticStatus();

    @Before
    public void createAndStartRM() {
        CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
        YarnConfiguration conf = new YarnConfiguration((Configuration)csConf);
        conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        conf.setBoolean("yarn.resourcemanager.opportunistic-container-allocation.enabled", true);
        conf.setInt("yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms", 100);
        conf.setBoolean("yarn.resourcemanager.recovery.enabled", true);
        conf.setBoolean("yarn.resourcemanager.work-preserving-recovery.enabled", true);
        conf.set("yarn.resourcemanager.store.class", MemoryRMStateStore.class.getName());
        this.startRM(conf);
    }

    public void createAndStartRMWithAutoUpdateContainer() {
        CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
        YarnConfiguration conf = new YarnConfiguration((Configuration)csConf);
        conf.setBoolean("yarn.resourcemanager.auto-update.containers", true);
        conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        conf.setBoolean("yarn.resourcemanager.opportunistic-container-allocation.enabled", true);
        conf.setInt("yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms", 100);
        this.startRM(conf);
    }

    private void startRM(YarnConfiguration conf) {
        this.dispatcher = new DrainDispatcher();
        this.rm = new MockRM((Configuration)conf){

            @Override
            protected Dispatcher createDispatcher() {
                return TestOpportunisticContainerAllocatorAMService.this.dispatcher;
            }
        };
        this.rm.start();
    }

    @After
    public void stopRM() {
        if (this.rm != null) {
            this.rm.stop();
        }
        OpportunisticSchedulerMetrics.resetMetrics();
    }

    @Test(timeout=600000L)
    public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception {
        HashMap<NodeId, MockNM> nodes = new HashMap<NodeId, MockNM>();
        MockNM nm1 = new MockNM("h1:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm1.getNodeId(), nm1);
        MockNM nm2 = new MockNM("h1:4321", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm2.getNodeId(), nm2);
        MockNM nm3 = new MockNM("h2:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm3.getNodeId(), nm3);
        MockNM nm4 = new MockNM("h2:4321", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm4.getNodeId(), nm4);
        nm1.registerNode();
        nm2.registerNode();
        nm3.registerNode();
        nm4.registerNode();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        nm3.nodeHeartbeat(this.oppContainersStatus, true);
        nm4.nodeHeartbeat(this.oppContainersStatus, true);
        OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService)this.rm.getApplicationMasterService();
        MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").withUnmanagedAM(false).build();
        RMApp app1 = MockRMAppSubmitter.submit(this.rm, data);
        MockAM am1 = MockRM.launchAndRegisterAM(app1, this.rm, nm2);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        nm3.nodeHeartbeat(this.oppContainersStatus, true);
        nm4.nodeHeartbeat(this.oppContainersStatus, true);
        GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 4, (long)10L, (long)1000L);
        QueueMetrics metrics = ((CapacityScheduler)scheduler).getRootQueue().getMetrics();
        this.verifyMetrics(metrics, 15360L, 15, 1024L, 1, 1);
        AllocateResponse allocateResponse = am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2, (boolean)true, null, (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))), null);
        List allocatedContainers = allocateResponse.getAllocatedContainers();
        Assert.assertEquals((long)2L, (long)allocatedContainers.size());
        Container container = (Container)allocatedContainers.get(0);
        MockNM allocNode = (MockNM)nodes.get(container.getNodeId());
        MockNM sameHostDiffNode = null;
        for (NodeId n : nodes.keySet()) {
            if (!n.getHost().equals(allocNode.getNodeId().getHost()) || n.getPort() == allocNode.getNodeId().getPort()) continue;
            sameHostDiffNode = (MockNM)nodes.get(n);
        }
        this.verifyMetrics(metrics, 15360L, 15, 1024L, 1, 1);
        am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)0, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        sameHostDiffNode.nodeHeartbeat(this.oppContainersStatus, true);
        this.rm.drainEvents();
        allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
        Assert.assertEquals((long)0L, (long)allocateResponse.getUpdatedContainers().size());
        this.dispatcher.waitForEventThreadToWait();
        this.rm.drainEvents();
        this.verifyMetrics(metrics, 15360L, 15, 1024L, 1, 1);
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)0, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        Assert.assertEquals((long)0L, (long)allocateResponse.getUpdatedContainers().size());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdateErrors().size());
        Assert.assertEquals((Object)"UPDATE_OUTSTANDING_ERROR", (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals((Object)container.getId(), (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)1, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        Assert.assertEquals((long)0L, (long)allocateResponse.getUpdatedContainers().size());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdateErrors().size());
        Assert.assertEquals((Object)"INCORRECT_CONTAINER_VERSION_ERROR", (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals((long)0L, (long)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getCurrentContainerVersion());
        Assert.assertEquals((Object)container.getId(), (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        allocNode.nodeHeartbeat(this.oppContainersStatus, true);
        this.rm.drainEvents();
        allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        Container uc = ((UpdatedContainer)allocateResponse.getUpdatedContainers().get(0)).getContainer();
        Assert.assertEquals((Object)ExecutionType.GUARANTEED, (Object)uc.getExecutionType());
        Assert.assertEquals((Object)uc.getId(), (Object)container.getId());
        Assert.assertEquals((long)uc.getVersion(), (long)(container.getVersion() + 1));
        this.verifyMetrics(metrics, 14336L, 14, 2048L, 2, 2);
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        nm3.nodeHeartbeat(this.oppContainersStatus, true);
        nm4.nodeHeartbeat(this.oppContainersStatus, true);
        this.rm.drainEvents();
        RMContainer rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
        Assert.assertEquals((Object)RMContainerState.ACQUIRED, (Object)rmContainer.getState());
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)uc.getVersion(), (ContainerId)uc.getId(), (ContainerUpdateType)ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.OPPORTUNISTIC)));
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        uc = ((UpdatedContainer)allocateResponse.getUpdatedContainers().get(0)).getContainer();
        Assert.assertEquals((Object)ExecutionType.OPPORTUNISTIC, (Object)uc.getExecutionType());
        Assert.assertEquals((Object)uc.getId(), (Object)container.getId());
        Assert.assertEquals((long)uc.getVersion(), (long)(container.getVersion() + 2));
        this.dispatcher.waitForEventThreadToWait();
        this.rm.drainEvents();
        this.verifyMetrics(metrics, 15360L, 15, 1024L, 1, 1);
    }

    @Test(timeout=60000L)
    public void testContainerPromoteAfterContainerStart() throws Exception {
        HashMap<NodeId, MockNM> nodes = new HashMap<NodeId, MockNM>();
        MockNM nm1 = new MockNM("h1:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm1.getNodeId(), nm1);
        MockNM nm2 = new MockNM("h2:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm2.getNodeId(), nm2);
        nm1.registerNode();
        nm2.registerNode();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService)this.rm.getApplicationMasterService();
        MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").withUnmanagedAM(false).build();
        RMApp app1 = MockRMAppSubmitter.submit(this.rm, data);
        MockAM am1 = MockRM.launchAndRegisterAM(app1, this.rm, nm2);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, (long)10L, (long)1000L);
        QueueMetrics metrics = ((CapacityScheduler)scheduler).getRootQueue().getMetrics();
        this.verifyMetrics(metrics, 7168L, 7, 1024L, 1, 1);
        AllocateResponse allocateResponse = am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2, (boolean)true, null, (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))), null);
        List allocatedContainers = allocateResponse.getAllocatedContainers();
        Assert.assertEquals((long)2L, (long)allocatedContainers.size());
        Container container = (Container)allocatedContainers.get(0);
        MockNM allocNode = (MockNM)nodes.get(container.getNodeId());
        allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        RMContainer rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId());
        Assert.assertEquals((Object)RMContainerState.RUNNING, (Object)rmContainer.getState());
        this.verifyMetrics(metrics, 7168L, 7, 1024L, 1, 1);
        am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)0, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        this.verifyMetrics(metrics, 7168L, 7, 1024L, 1, 1);
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)0, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        Assert.assertEquals((long)0L, (long)allocateResponse.getUpdatedContainers().size());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdateErrors().size());
        Assert.assertEquals((Object)"UPDATE_OUTSTANDING_ERROR", (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals((Object)container.getId(), (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        Container uc = ((UpdatedContainer)allocateResponse.getUpdatedContainers().get(0)).getContainer();
        Assert.assertEquals((Object)ExecutionType.GUARANTEED, (Object)uc.getExecutionType());
        Assert.assertEquals((Object)uc.getId(), (Object)container.getId());
        Assert.assertEquals((long)uc.getVersion(), (long)(container.getVersion() + 1));
        rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
        Assert.assertEquals((Object)RMContainerState.RUNNING, (Object)rmContainer.getState());
        this.verifyMetrics(metrics, 6144L, 6, 2048L, 2, 2);
    }

    @Test(timeout=600000L)
    public void testContainerPromoteAfterContainerComplete() throws Exception {
        HashMap<NodeId, MockNM> nodes = new HashMap<NodeId, MockNM>();
        MockNM nm1 = new MockNM("h1:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm1.getNodeId(), nm1);
        MockNM nm2 = new MockNM("h2:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm2.getNodeId(), nm2);
        nm1.registerNode();
        nm2.registerNode();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService)this.rm.getApplicationMasterService();
        MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").withUnmanagedAM(false).build();
        RMApp app1 = MockRMAppSubmitter.submit(this.rm, data);
        MockAM am1 = MockRM.launchAndRegisterAM(app1, this.rm, nm2);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, (long)10L, (long)1000L);
        QueueMetrics metrics = ((CapacityScheduler)scheduler).getRootQueue().getMetrics();
        this.verifyMetrics(metrics, 7168L, 7, 1024L, 1, 1);
        AllocateResponse allocateResponse = am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2, (boolean)true, null, (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))), null);
        List allocatedContainers = allocateResponse.getAllocatedContainers();
        Assert.assertEquals((long)2L, (long)allocatedContainers.size());
        Container container = (Container)allocatedContainers.get(0);
        MockNM allocNode = (MockNM)nodes.get(container.getNodeId());
        allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        RMContainer rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId());
        Assert.assertEquals((Object)RMContainerState.RUNNING, (Object)rmContainer.getState());
        allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.COMPLETE, (String)"", (int)0)), true);
        this.rm.drainEvents();
        rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId());
        Assert.assertNull((Object)rmContainer);
        this.verifyMetrics(metrics, 7168L, 7, 1024L, 1, 1);
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)0, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        Assert.assertEquals((long)1L, (long)allocateResponse.getCompletedContainersStatuses().size());
        Assert.assertEquals((Object)container.getId(), (Object)((ContainerStatus)allocateResponse.getCompletedContainersStatuses().get(0)).getContainerId());
        Assert.assertEquals((long)0L, (long)allocateResponse.getUpdatedContainers().size());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdateErrors().size());
        Assert.assertEquals((Object)"INVALID_CONTAINER_ID", (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals((Object)container.getId(), (Object)((UpdateContainerError)allocateResponse.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        this.verifyMetrics(metrics, 7168L, 7, 1024L, 1, 1);
    }

    @Test(timeout=600000L)
    public void testContainerAutoUpdateContainer() throws Exception {
        this.rm.stop();
        this.createAndStartRMWithAutoUpdateContainer();
        MockNM nm1 = new MockNM("h1:1234", 4096, this.rm.getResourceTrackerService());
        nm1.registerNode();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService)this.rm.getApplicationMasterService();
        MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").withUnmanagedAM(false).build();
        RMApp app1 = MockRMAppSubmitter.submit(this.rm, data);
        MockAM am1 = MockRM.launchAndRegisterAM(app1, this.rm, nm1);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        RMNode rmNode1 = (RMNode)this.rm.getRMContext().getRMNodes().get(nm1.getNodeId());
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 1, (long)10L, (long)1000L);
        AllocateResponse allocateResponse = am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2, (boolean)true, null, (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))), null);
        List allocatedContainers = allocateResponse.getAllocatedContainers();
        allocatedContainers.addAll(am1.allocate(null, null).getAllocatedContainers());
        Assert.assertEquals((long)2L, (long)allocatedContainers.size());
        Container container = (Container)allocatedContainers.get(0);
        nm1.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        RMContainer rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId());
        Assert.assertEquals((Object)RMContainerState.RUNNING, (Object)rmContainer.getState());
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)0, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.GUARANTEED)));
        nm1.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        UpdatedContainer uc = (UpdatedContainer)allocateResponse.getUpdatedContainers().get(0);
        Assert.assertEquals((Object)container.getId(), (Object)uc.getContainer().getId());
        Assert.assertEquals((Object)ExecutionType.GUARANTEED, (Object)uc.getContainer().getExecutionType());
        NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
        Assert.assertEquals((long)1L, (long)response.getContainersToUpdate().size());
        Container containersFromNM = (Container)response.getContainersToUpdate().get(0);
        Assert.assertEquals((Object)container.getId(), (Object)containersFromNM.getId());
        Assert.assertEquals((Object)ExecutionType.GUARANTEED, (Object)containersFromNM.getExecutionType());
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)1, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.INCREASE_RESOURCE, (Resource)Resources.createResource((int)2048, (int)1), null)));
        response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.GUARANTEED, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        if (allocateResponse.getUpdatedContainers().size() == 0) {
            allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
        }
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        uc = (UpdatedContainer)allocateResponse.getUpdatedContainers().get(0);
        Assert.assertEquals((Object)container.getId(), (Object)uc.getContainer().getId());
        Assert.assertEquals((Object)Resource.newInstance((int)2048, (int)1), (Object)uc.getContainer().getResource());
        this.rm.drainEvents();
        if (response.getContainersToUpdate().size() == 0) {
            response = nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((long)1L, (long)response.getContainersToUpdate().size());
        Assert.assertEquals((Object)Resource.newInstance((int)2048, (int)1), (Object)((Container)response.getContainersToUpdate().get(0)).getResource());
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)2, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.DECREASE_RESOURCE, (Resource)Resources.createResource((int)1024, (int)1), null)));
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        this.rm.drainEvents();
        response = nm1.nodeHeartbeat(true);
        Assert.assertEquals((long)1L, (long)response.getContainersToUpdate().size());
        Assert.assertEquals((Object)Resource.newInstance((int)1024, (int)1), (Object)((Container)response.getContainersToUpdate().get(0)).getResource());
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance((int)3, (ContainerId)container.getId(), (ContainerUpdateType)ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null, (ExecutionType)ExecutionType.OPPORTUNISTIC)));
        response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.GUARANTEED, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        if (allocateResponse.getUpdatedContainers().size() == 0) {
            allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
        }
        Assert.assertEquals((long)1L, (long)allocateResponse.getUpdatedContainers().size());
        uc = (UpdatedContainer)allocateResponse.getUpdatedContainers().get(0);
        Assert.assertEquals((Object)ExecutionType.OPPORTUNISTIC, (Object)uc.getContainer().getExecutionType());
        if (response.getContainersToUpdate().size() == 0) {
            response = nm1.nodeHeartbeat(this.oppContainersStatus, true);
        }
        Assert.assertEquals((long)1L, (long)response.getContainersToUpdate().size());
        Assert.assertEquals((Object)ExecutionType.OPPORTUNISTIC, (Object)((Container)response.getContainersToUpdate().get(0)).getExecutionType());
    }

    private void verifyMetrics(QueueMetrics metrics, long availableMB, int availableVirtualCores, long allocatedMB, int allocatedVirtualCores, int allocatedContainers) {
        Assert.assertEquals((long)availableMB, (long)metrics.getAvailableMB());
        Assert.assertEquals((long)availableVirtualCores, (long)metrics.getAvailableVirtualCores());
        Assert.assertEquals((long)allocatedMB, (long)metrics.getAllocatedMB());
        Assert.assertEquals((long)allocatedVirtualCores, (long)metrics.getAllocatedVirtualCores());
        Assert.assertEquals((long)allocatedContainers, (long)metrics.getAllocatedContainers());
    }

    @Test(timeout=60000L)
    public void testOpportunisticSchedulerMetrics() throws Exception {
        HashMap<NodeId, MockNM> nodes = new HashMap<NodeId, MockNM>();
        MockNM nm1 = new MockNM("h1:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm1.getNodeId(), nm1);
        MockNM nm2 = new MockNM("h2:1234", 4096, this.rm.getResourceTrackerService());
        nodes.put(nm2.getNodeId(), nm2);
        nm1.registerNode();
        nm2.registerNode();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        OpportunisticSchedulerMetrics metrics = OpportunisticSchedulerMetrics.getMetrics();
        int allocContainers = metrics.getAllocatedContainers();
        long aggrAllocatedContainers = metrics.getAggregatedAllocatedContainers();
        long aggrOffSwitchContainers = metrics.getAggregatedOffSwitchContainers();
        long aggrReleasedContainers = metrics.getAggregatedReleasedContainers();
        OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService)this.rm.getApplicationMasterService();
        RMApp app1 = MockRMAppSubmitter.submit(this.rm, MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").build());
        ApplicationAttemptId attemptId = app1.getCurrentAppAttempt().getAppAttemptId();
        MockAM am1 = MockRM.launchAndRegisterAM(app1, this.rm, nm2);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, (long)10L, (long)1000L);
        AllocateResponse allocateResponse = am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2, (boolean)true, null, (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))), null);
        List allocatedContainers = allocateResponse.getAllocatedContainers();
        Assert.assertEquals((long)2L, (long)allocatedContainers.size());
        Assert.assertEquals((long)(allocContainers + 2), (long)metrics.getAllocatedContainers());
        Assert.assertEquals((long)(aggrAllocatedContainers + 2L), (long)metrics.getAggregatedAllocatedContainers());
        Assert.assertEquals((long)(aggrOffSwitchContainers + 2L), (long)metrics.getAggregatedOffSwitchContainers());
        Container container = (Container)allocatedContainers.get(0);
        MockNM allocNode = (MockNM)nodes.get(container.getNodeId());
        allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.RUNNING, (String)"", (int)0)), true);
        this.rm.drainEvents();
        RMContainer rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId());
        Assert.assertEquals((Object)RMContainerState.RUNNING, (Object)rmContainer.getState());
        allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance((ContainerId)container.getId(), (ExecutionType)ExecutionType.OPPORTUNISTIC, (ContainerState)ContainerState.COMPLETE, (String)"", (int)0)), true);
        this.rm.drainEvents();
        rmContainer = ((CapacityScheduler)scheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId());
        Assert.assertNull((Object)rmContainer);
        Assert.assertEquals((long)(allocContainers + 1), (long)metrics.getAllocatedContainers());
        Assert.assertEquals((long)(aggrReleasedContainers + 1L), (long)metrics.getAggregatedReleasedContainers());
    }

    @Test
    public void testMetricsRetainsAllocatedOpportunisticAfterRMRestart() throws Exception {
        MockRMAppSubmissionData appSubmissionData = MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").build();
        MockNM nm1 = new MockNM("h:1234", 4096, this.rm.getResourceTrackerService());
        nm1.registerNode();
        RMApp app = MockRMAppSubmitter.submit(this.rm, appSubmissionData);
        ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId();
        MockRM.launchAndRegisterAM(app, this.rm, nm1);
        OpportunisticSchedulerMetrics metrics = OpportunisticSchedulerMetrics.getMetrics();
        ContainerId recoverOContainerId2 = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)2L);
        Resource fakeResource = Resource.newInstance((int)1024, (int)1);
        String fakeDiagnostics = "recover container";
        Priority fakePriority = Priority.newInstance((int)0);
        NMContainerStatus recoverOContainerReport1 = NMContainerStatus.newInstance((ContainerId)recoverOContainerId2, (int)0, (ContainerState)ContainerState.RUNNING, (Resource)fakeResource, (String)"recover container", (int)0, (Priority)fakePriority, (long)0L, null, (ExecutionType)ExecutionType.OPPORTUNISTIC, (long)-1L);
        Assert.assertEquals((long)0L, (long)metrics.getAllocatedContainers());
        this.rm.registerNode("h2:1234", 4096, 1, Collections.singletonList(appAttemptId.getApplicationId()), Collections.singletonList(recoverOContainerReport1));
        Assert.assertEquals((long)1L, (long)metrics.getAllocatedContainers());
        ContainerId recoverOContainerId3 = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        ContainerId recoverOContainerId4 = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)4L);
        NMContainerStatus recoverOContainerReport2 = NMContainerStatus.newInstance((ContainerId)recoverOContainerId2, (int)0, (ContainerState)ContainerState.RUNNING, (Resource)fakeResource, (String)"recover container", (int)0, (Priority)fakePriority, (long)0L, null, (ExecutionType)ExecutionType.OPPORTUNISTIC, (long)-1L);
        NMContainerStatus recoverOContainerReport3 = NMContainerStatus.newInstance((ContainerId)recoverOContainerId3, (int)0, (ContainerState)ContainerState.RUNNING, (Resource)fakeResource, (String)"recover container", (int)0, (Priority)fakePriority, (long)0L, null, (ExecutionType)ExecutionType.OPPORTUNISTIC, (long)-1L);
        this.rm.registerNode("h3:1234", 4096, 10, Collections.singletonList(appAttemptId.getApplicationId()), Arrays.asList(recoverOContainerReport2, recoverOContainerReport3));
        Assert.assertEquals((long)3L, (long)metrics.getAllocatedContainers());
        ContainerId recoverGContainerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)5L);
        NMContainerStatus recoverGContainerReport = NMContainerStatus.newInstance((ContainerId)recoverGContainerId, (int)0, (ContainerState)ContainerState.RUNNING, (Resource)fakeResource, (String)"recover container", (int)0, (Priority)fakePriority, (long)0L, null, (ExecutionType)ExecutionType.GUARANTEED, (long)-1L);
        this.rm.registerNode("h4:1234", 4096, 10, Collections.singletonList(appAttemptId.getApplicationId()), Collections.singletonList(recoverGContainerReport));
        Assert.assertEquals((long)3L, (long)metrics.getAllocatedContainers());
        ContainerId completedOContainerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)6L);
        NMContainerStatus completedOContainerReport = NMContainerStatus.newInstance((ContainerId)completedOContainerId, (int)0, (ContainerState)ContainerState.COMPLETE, (Resource)fakeResource, (String)"recover container", (int)0, (Priority)fakePriority, (long)0L, null, (ExecutionType)ExecutionType.OPPORTUNISTIC, (long)-1L);
        this.rm.registerNode("h5:1234", 4096, 10, Collections.singletonList(appAttemptId.getApplicationId()), Collections.singletonList(completedOContainerReport));
        Assert.assertEquals((long)3L, (long)metrics.getAllocatedContainers());
    }

    @Test(timeout=60000L)
    public void testAMCrashDuringAllocate() throws Exception {
        MockNM nm = new MockNM("h:1234", 4096, this.rm.getResourceTrackerService());
        nm.registerNode();
        RMApp app = MockRMAppSubmitter.submit(this.rm, MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").build());
        ApplicationAttemptId attemptId0 = app.getCurrentAppAttempt().getAppAttemptId();
        MockAM am = MockRM.launchAndRegisterAM(app, this.rm, nm);
        CapacityScheduler scheduler = (CapacityScheduler)this.rm.getRMContext().getScheduler();
        SchedulerApplication schApp = (SchedulerApplication)scheduler.getSchedulerApplications().get(attemptId0.getApplicationId());
        ApplicationAttemptId appAttemptId1 = TestUtils.getMockApplicationAttemptId(1, 1);
        schApp.setCurrentAppAttempt((SchedulerApplicationAttempt)new FiCaSchedulerApp(appAttemptId1, null, (Queue)scheduler.getQueue("default"), null, this.rm.getRMContext()));
        am.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2)), null);
    }

    @Test(timeout=60000L)
    public void testNodeRemovalDuringAllocate() throws Exception {
        int i;
        MockNM nm1 = new MockNM("h1:1234", 4096, this.rm.getResourceTrackerService());
        MockNM nm2 = new MockNM("h2:1234", 4096, this.rm.getResourceTrackerService());
        nm1.registerNode();
        nm2.registerNode();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService)this.rm.getApplicationMasterService();
        MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").withUnmanagedAM(false).build();
        RMApp app1 = MockRMAppSubmitter.submit(this.rm, data);
        ApplicationAttemptId attemptId = app1.getCurrentAppAttempt().getAppAttemptId();
        MockAM am1 = MockRM.launchAndRegisterAM(app1, this.rm, nm2);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        RMNode rmNode1 = (RMNode)this.rm.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)this.rm.getRMContext().getRMNodes().get(nm2.getNodeId());
        OpportunisticContainerContext ctxt = ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId).getOpportunisticContainerContext();
        nm1.nodeHeartbeat(this.oppContainersStatus, true);
        nm2.nodeHeartbeat(this.oppContainersStatus, true);
        for (i = 0; i < 10; ++i) {
            am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2)), null);
            if (ctxt.getNodeMap().size() == 2) break;
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)2L, (long)ctxt.getNodeMap().size());
        scheduler.handle((Event)new NodeRemovedSchedulerEvent(rmNode1));
        for (i = 0; i < 10; ++i) {
            try {
                am1.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2)), null);
            }
            catch (Exception e) {
                Assert.fail((String)"Allocate request should be handled on node removal");
            }
            if (ctxt.getNodeMap().size() == 1) break;
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)1L, (long)ctxt.getNodeMap().size());
    }

    @Test(timeout=60000L)
    public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
        MockNM nm = new MockNM("h:1234", 4096, this.rm.getResourceTrackerService());
        nm.registerNode();
        nm.nodeHeartbeat(this.oppContainersStatus, true);
        RMApp app = MockRMAppSubmitter.submit(this.rm, MockRMAppSubmissionData.Builder.createWithMemory(1024L, this.rm).withAppName("app").withUser("user").withAcls(null).withQueue("default").build());
        ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId();
        MockAM am = MockRM.launchAndRegisterAM(app, this.rm, nm);
        ResourceScheduler scheduler = this.rm.getResourceScheduler();
        FiCaSchedulerApp schedulerAttempt = ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
        RMNode rmNode1 = (RMNode)this.rm.getRMContext().getRMNodes().get(nm.getNodeId());
        nm.nodeHeartbeat(this.oppContainersStatus, true);
        GenericTestUtils.waitFor(() -> scheduler.getNumClusterNodes() == 1, (long)10L, (long)20000L);
        AllocateResponse allocateResponse = am.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)2, (boolean)true, null, (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))), null);
        List allocatedContainers = allocateResponse.getAllocatedContainers();
        Container container = (Container)allocatedContainers.get(0);
        scheduler.handle((Event)new NodeRemovedSchedulerEvent(rmNode1));
        GenericTestUtils.waitFor(() -> scheduler.getNumClusterNodes() == 0, (long)10L, (long)20000L);
        RMContainer rmContainer = null;
        rmContainer = SchedulerUtils.createOpportunisticRmContainer((RMContext)this.rm.getRMContext(), (Container)container, (boolean)true);
        if (rmContainer == null) {
            rmContainer = new RMContainerImpl(container, SchedulerRequestKey.extractFrom((Container)container), schedulerAttempt.getApplicationAttemptId(), container.getNodeId(), schedulerAttempt.getUser(), this.rm.getRMContext(), true);
        }
        schedulerAttempt.addRMContainer(container.getId(), rmContainer);
        scheduler.handle((Event)new AppAttemptRemovedSchedulerEvent(attemptId, RMAppAttemptState.FAILED, false));
    }

    private OpportunisticContainersStatus getOpportunisticStatus() {
        return this.getOppurtunisticStatus(-1, 100, 1000);
    }

    private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, int queueLength, int queueCapacity) {
        OpportunisticContainersStatus status = OpportunisticContainersStatus.newInstance();
        status.setEstimatedQueueWaitTime(waitTime);
        status.setOpportQueueCapacity(queueCapacity);
        status.setWaitQueueLength(queueLength);
        return status;
    }

    @Test
    public void testRPCWrapping() throws Exception {
        final Configuration conf = new Configuration();
        conf.set("yarn.ipc.rpc.class", HadoopYarnProtoRPC.class.getName());
        YarnRPC rpc = YarnRPC.create((Configuration)conf);
        String bindAddr = "localhost:0";
        InetSocketAddress addr = NetUtils.createSocketAddr((String)bindAddr);
        conf.setSocketAddr("yarn.resourcemanager.scheduler.address", addr);
        RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
        RMContextImpl rmContext = new RMContextImpl(){

            public AMLivelinessMonitor getAMLivelinessMonitor() {
                return null;
            }

            public Configuration getYarnConfiguration() {
                return new YarnConfiguration();
            }

            public RMContainerTokenSecretManager getContainerTokenSecretManager() {
                return new RMContainerTokenSecretManager(conf);
            }

            public ResourceScheduler getScheduler() {
                return new FifoScheduler();
            }
        };
        Container c = (Container)factory.newRecordInstance(Container.class);
        c.setExecutionType(ExecutionType.OPPORTUNISTIC);
        c.setId(ContainerId.newContainerId((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)12345L, (int)1), (int)2), (long)3L));
        AllocateRequestPBImpl allReq = (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
        allReq.setAskList(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.UNDEFINED, (String)"a", (Resource)Resource.newInstance((int)1, (int)2), (int)1, (boolean)true, (String)"exp", (ExecutionTypeRequest)ExecutionTypeRequest.newInstance((ExecutionType)ExecutionType.OPPORTUNISTIC, (boolean)true))));
        OpportunisticContainerAllocatorAMService service = this.createService(factory, (RMContext)rmContext, c);
        conf.setBoolean("yarn.nodemanager.distributed-scheduling.enabled", true);
        Server server = service.getServer(rpc, conf, addr, null);
        server.start();
        RPC.setProtocolEngine((Configuration)conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine2.class);
        ApplicationMasterProtocolPB ampProxy = (ApplicationMasterProtocolPB)RPC.getProxy(ApplicationMasterProtocolPB.class, (long)1L, (InetSocketAddress)NetUtils.getConnectAddress((Server)server), (Configuration)conf);
        RegisterApplicationMasterResponsePBImpl regResp = new RegisterApplicationMasterResponsePBImpl(ampProxy.registerApplicationMaster(null, ((RegisterApplicationMasterRequestPBImpl)factory.newRecordInstance(RegisterApplicationMasterRequest.class)).getProto()));
        Assert.assertEquals((Object)"dummyQueue", (Object)regResp.getQueue());
        FinishApplicationMasterResponsePBImpl finishResp = new FinishApplicationMasterResponsePBImpl(ampProxy.finishApplicationMaster(null, ((FinishApplicationMasterRequestPBImpl)factory.newRecordInstance(FinishApplicationMasterRequest.class)).getProto()));
        Assert.assertEquals((Object)false, (Object)finishResp.getIsUnregistered());
        AllocateResponsePBImpl allocResp = new AllocateResponsePBImpl(ampProxy.allocate(null, ((AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class)).getProto()));
        List allocatedContainers = allocResp.getAllocatedContainers();
        Assert.assertEquals((long)1L, (long)allocatedContainers.size());
        Assert.assertEquals((Object)ExecutionType.OPPORTUNISTIC, (Object)((Container)allocatedContainers.get(0)).getExecutionType());
        Assert.assertEquals((long)12345L, (long)allocResp.getNumClusterNodes());
        RPC.setProtocolEngine((Configuration)conf, DistributedSchedulingAMProtocolPB.class, ProtobufRpcEngine2.class);
        DistributedSchedulingAMProtocolPB dsProxy = (DistributedSchedulingAMProtocolPB)RPC.getProxy(DistributedSchedulingAMProtocolPB.class, (long)1L, (InetSocketAddress)NetUtils.getConnectAddress((Server)server), (Configuration)conf);
        RegisterDistributedSchedulingAMResponsePBImpl dsRegResp = new RegisterDistributedSchedulingAMResponsePBImpl(dsProxy.registerApplicationMasterForDistributedScheduling(null, ((RegisterApplicationMasterRequestPBImpl)factory.newRecordInstance(RegisterApplicationMasterRequest.class)).getProto()));
        Assert.assertEquals((long)54321L, (long)dsRegResp.getContainerIdStart());
        Assert.assertEquals((long)4L, (long)dsRegResp.getMaxContainerResource().getVirtualCores());
        Assert.assertEquals((long)1024L, (long)dsRegResp.getMinContainerResource().getMemorySize());
        Assert.assertEquals((long)2L, (long)dsRegResp.getIncrContainerResource().getVirtualCores());
        DistributedSchedulingAllocateRequestPBImpl distAllReq = (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(DistributedSchedulingAllocateRequest.class);
        distAllReq.setAllocateRequest((AllocateRequest)allReq);
        distAllReq.setAllocatedContainers(Arrays.asList(c));
        DistributedSchedulingAllocateResponsePBImpl dsAllocResp = new DistributedSchedulingAllocateResponsePBImpl(dsProxy.allocateForDistributedScheduling(null, distAllReq.getProto()));
        Assert.assertEquals((Object)"h1", (Object)((RemoteNode)dsAllocResp.getNodesForScheduling().get(0)).getNodeId().getHost());
        Assert.assertEquals((Object)"l1", (Object)((RemoteNode)dsAllocResp.getNodesForScheduling().get(1)).getNodePartition());
        FinishApplicationMasterResponsePBImpl dsfinishResp = new FinishApplicationMasterResponsePBImpl(dsProxy.finishApplicationMaster(null, ((FinishApplicationMasterRequestPBImpl)factory.newRecordInstance(FinishApplicationMasterRequest.class)).getProto()));
        Assert.assertEquals((Object)false, (Object)dsfinishResp.getIsUnregistered());
    }

    private OpportunisticContainerAllocatorAMService createService(final RecordFactory factory, RMContext rmContext, final Container c) {
        return new OpportunisticContainerAllocatorAMService(rmContext, null){

            public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException {
                RegisterApplicationMasterResponse resp = (RegisterApplicationMasterResponse)factory.newRecordInstance(RegisterApplicationMasterResponse.class);
                resp.setQueue("dummyQueue");
                return resp;
            }

            public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnException, IOException {
                FinishApplicationMasterResponse resp = (FinishApplicationMasterResponse)factory.newRecordInstance(FinishApplicationMasterResponse.class);
                resp.setIsUnregistered(false);
                return resp;
            }

            public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException {
                AllocateResponse response = (AllocateResponse)factory.newRecordInstance(AllocateResponse.class);
                response.setNumClusterNodes(12345);
                response.setAllocatedContainers(Arrays.asList(c));
                return response;
            }

            public RegisterDistributedSchedulingAMResponse registerApplicationMasterForDistributedScheduling(RegisterApplicationMasterRequest request) throws YarnException, IOException {
                RegisterDistributedSchedulingAMResponse resp = (RegisterDistributedSchedulingAMResponse)factory.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
                resp.setContainerIdStart(54321L);
                resp.setMaxContainerResource(Resource.newInstance((int)4096, (int)4));
                resp.setMinContainerResource(Resource.newInstance((int)1024, (int)1));
                resp.setIncrContainerResource(Resource.newInstance((int)2048, (int)2));
                return resp;
            }

            public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(DistributedSchedulingAllocateRequest request) throws YarnException, IOException {
                List askList = request.getAllocateRequest().getAskList();
                List allocatedContainers = request.getAllocatedContainers();
                Assert.assertEquals((long)1L, (long)allocatedContainers.size());
                Assert.assertEquals((Object)ExecutionType.OPPORTUNISTIC, (Object)((Container)allocatedContainers.get(0)).getExecutionType());
                Assert.assertEquals((long)1L, (long)askList.size());
                Assert.assertTrue((boolean)((ResourceRequest)askList.get(0)).getExecutionTypeRequest().getEnforceExecutionType());
                DistributedSchedulingAllocateResponse resp = (DistributedSchedulingAllocateResponse)factory.newRecordInstance(DistributedSchedulingAllocateResponse.class);
                RemoteNode remoteNode1 = RemoteNode.newInstance((NodeId)NodeId.newInstance((String)"h1", (int)1234), (String)"http://h1:4321");
                RemoteNode remoteNode2 = RemoteNode.newInstance((NodeId)NodeId.newInstance((String)"h2", (int)1234), (String)"http://h2:4321");
                remoteNode2.setNodePartition("l1");
                resp.setNodesForScheduling(Arrays.asList(remoteNode1, remoteNode2));
                return resp;
            }
        };
    }
}

