/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.Collections;
import java.util.HashMap;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.event.Level;

public class TestCapacitySchedulerPerf {
    private final int GB = 1024;

    private String getResourceName(int idx) {
        return "resource-" + idx;
    }

    private void testUserLimitThroughputWithNumberOfResourceTypes(int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount) throws Exception {
        int i;
        Assume.assumeTrue((boolean)Boolean.valueOf(System.getProperty("RunCapacitySchedulerPerfTests")));
        int numThreads = Integer.valueOf(System.getProperty("CapacitySchedulerPerfTestsNumThreads", "0"));
        if (numOfResourceTypes > 2) {
            HashMap<String, ResourceInformation> riMap = new HashMap<String, ResourceInformation>();
            riMap.put("memory-mb", ResourceInformation.MEMORY_MB);
            riMap.put("vcores", ResourceInformation.VCORES);
            for (int i2 = 2; i2 < numOfResourceTypes; ++i2) {
                String resourceName = this.getResourceName(i2);
                riMap.put(resourceName, ResourceInformation.newInstance((String)resourceName, (String)"", (long)0L, (ResourceTypes)ResourceTypes.COUNTABLE, (long)0L, (long)Integer.MAX_VALUE));
            }
            ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
        }
        int activeQueues = (int)((float)numQueues * ((float)pctActiveQueues / 100.0f));
        int totalApps = appCount + activeQueues;
        CapacitySchedulerConfiguration csconf = this.createCSConfWithManyQueues(numQueues);
        if (numThreads > 0) {
            csconf.setScheduleAynschronously(true);
            csconf.setInt("yarn.scheduler.capacity.schedule-asynchronously.maximum-threads", numThreads);
            csconf.setLong("yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms", 0L);
        }
        YarnConfiguration conf = new YarnConfiguration((Configuration)csconf);
        conf.setBoolean("yarn.test.reset-resource-types", false);
        if (numThreads > 0) {
            conf.setClass("yarn.resourcemanager.scheduler.class", CapacitySchedulerPerf.class, ResourceScheduler.class);
            conf.setLong("yarn.resourcemanager.nodemanagers.heartbeat-interval-ms", 600000L);
        } else {
            conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        }
        MockRM rm = new MockRM((Configuration)conf);
        rm.start();
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        LeafQueue[] lqs = new LeafQueue[numQueues];
        for (int i3 = 0; i3 < numQueues; ++i3) {
            String queueName = String.format("%03d", i3);
            LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
            qb.setUserLimitFactor(100.0f);
            qb.setupConfigurableCapacities();
            lqs[i3] = qb;
        }
        Container container = (Container)Mockito.mock(Container.class);
        ApplicationSubmissionContext submissionContext = (ApplicationSubmissionContext)Mockito.mock(ApplicationSubmissionContext.class);
        ApplicationId[] appids = new ApplicationId[totalApps];
        RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps];
        ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps];
        RMAppImpl[] apps = new RMAppImpl[totalApps];
        RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps];
        for (int i4 = 0; i4 < totalApps; ++i4) {
            appids[i4] = BuilderUtils.newApplicationId((long)100L, (int)i4);
            appAttemptIds[i4] = BuilderUtils.newApplicationAttemptId((ApplicationId)appids[i4], (int)1);
            attemptMetrics[i4] = new RMAppAttemptMetrics(appAttemptIds[i4], rm.getRMContext());
            apps[i4] = (RMAppImpl)Mockito.mock(RMAppImpl.class);
            Mockito.when((Object)apps[i4].getApplicationId()).thenReturn((Object)appids[i4]);
            attempts[i4] = (RMAppAttemptImpl)Mockito.mock(RMAppAttemptImpl.class);
            Mockito.when((Object)attempts[i4].getMasterContainer()).thenReturn((Object)container);
            Mockito.when((Object)attempts[i4].getSubmissionContext()).thenReturn((Object)submissionContext);
            Mockito.when((Object)attempts[i4].getAppAttemptId()).thenReturn((Object)appAttemptIds[i4]);
            Mockito.when((Object)attempts[i4].getRMAppAttemptMetrics()).thenReturn((Object)attemptMetrics[i4]);
            Mockito.when((Object)apps[i4].getCurrentAppAttempt()).thenReturn((Object)attempts[i4]);
            rm.getRMContext().getRMApps().put(appids[i4], apps[i4]);
            String queueName = lqs[i4 % activeQueues].getQueuePath();
            AppAddedSchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appids[i4], queueName, "user1");
            cs.handle((SchedulerEvent)addAppEvent);
            AppAttemptAddedSchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent(appAttemptIds[i4], false);
            cs.handle((SchedulerEvent)addAttemptEvent);
        }
        Resource newResource = Resource.newInstance((int)(totalApps * 1024), (int)totalApps);
        if (numOfResourceTypes > 2) {
            for (int i5 = 2; i5 < numOfResourceTypes; ++i5) {
                newResource.setResourceValue(this.getResourceName(i5), (long)totalApps);
            }
        }
        RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
        cs.handle((SchedulerEvent)new NodeAddedSchedulerEvent(node));
        RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2");
        cs.handle((SchedulerEvent)new NodeAddedSchedulerEvent(node2));
        Priority u0Priority = TestUtils.createMockPriority(1);
        RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
        if (numThreads > 0) {
            for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
                t.suspendSchedule();
            }
        }
        FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
        for (i = 0; i < totalApps; ++i) {
            fiCaApps[i] = (FiCaSchedulerApp)((SchedulerApplication)cs.getSchedulerApplications().get(apps[i].getApplicationId())).getCurrentAppAttempt();
            ResourceRequest resourceRequest = TestUtils.createResourceRequest("*", 1024, 1, true, u0Priority, recordFactory);
            if (numOfResourceTypes > 2) {
                for (int j = 2; j < numOfResourceTypes; ++j) {
                    resourceRequest.getCapability().setResourceValue(this.getResourceName(j), 10L);
                }
            }
            fiCaApps[i].updateResourceRequests(Collections.singletonList(resourceRequest));
        }
        for (i = 0; i < numQueues; ++i) {
            lqs[i].setUserLimitFactor(0.0f);
        }
        if (numThreads > 0) {
            for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
                t.beginSchedule();
            }
            while (CapacitySchedulerMetrics.getMetrics().commitSuccess.lastStat().numSamples() < (long)activeQueues) {
                Thread.sleep(1000L);
            }
            int numNotPending = 0;
            for (int i6 = 0; i6 < totalApps; ++i6) {
                boolean pending = fiCaApps[i6].getAppSchedulingInfo().isPending();
                if (!pending) {
                    ++numNotPending;
                    Assert.assertEquals((long)0L, (long)fiCaApps[i6].getTotalPendingRequestsPerPartition().size());
                    continue;
                }
                Assert.assertEquals((long)1024L, (long)((Resource)fiCaApps[i6].getTotalPendingRequestsPerPartition().get("")).getMemorySize());
            }
            Assert.assertEquals((long)activeQueues, (long)numNotPending);
        } else {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(node));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(node2));
            for (i = 0; i < totalApps; ++i) {
                boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
                if (i < activeQueues) {
                    Assert.assertFalse((boolean)pending);
                    Assert.assertEquals((long)0L, (long)fiCaApps[i].getTotalPendingRequestsPerPartition().size());
                    continue;
                }
                Assert.assertTrue((boolean)pending);
                Assert.assertEquals((long)1024L, (long)((Resource)fiCaApps[i].getTotalPendingRequestsPerPartition().get("")).getMemorySize());
            }
        }
        GenericTestUtils.setRootLogLevel((Level)Level.WARN);
        if (numThreads > 0) {
            System.out.println("Starting now");
            ((CapacitySchedulerPerf)cs).enable = true;
            long start = Time.monotonicNow();
            Thread.sleep(60000L);
            long end = Time.monotonicNow();
            ((CapacitySchedulerPerf)cs).enable = false;
            long numOps = ((CapacitySchedulerPerf)cs).count.get();
            System.out.println("Number of operations: " + numOps);
            System.out.println("Time taken: " + (end - start) + " ms");
            System.out.println(numOps * 1000L / (end - start) + " ops / second");
        } else {
            int topn = 20;
            int iterations = 2000000;
            int printInterval = 20000;
            float numerator = 2.0E7f;
            PriorityQueue queue = new PriorityQueue(20, Collections.reverseOrder());
            long n = Time.monotonicNow();
            long timespent = 0L;
            for (int i7 = 0; i7 < 2000000; i7 += 2) {
                if (i7 > 0 && i7 % 20000 == 0) {
                    long ts = Time.monotonicNow() - n;
                    if (queue.size() < 20) {
                        queue.offer(ts);
                    } else {
                        Long last = (Long)queue.peek();
                        if (last > ts) {
                            queue.poll();
                            queue.offer(ts);
                        }
                    }
                    System.out.println(i7 + " " + 2.0E7f / (float)ts);
                    n = Time.monotonicNow();
                }
                cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(node));
                cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(node2));
            }
            timespent = 0L;
            int entries = queue.size();
            while (queue.size() > 0) {
                long l = (Long)queue.poll();
                timespent += l;
            }
            System.out.println("#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries + ": " + 2.0E7f / (float)(timespent / (long)entries) + " ops/sec of " + appCount + " apps on " + pctActiveQueues + "% of " + numQueues + " queues.");
        }
        if (numThreads > 0) {
            int numNotPending = 0;
            for (int i8 = 0; i8 < totalApps; ++i8) {
                boolean pending = fiCaApps[i8].getAppSchedulingInfo().isPending();
                if (!pending) {
                    ++numNotPending;
                    Assert.assertEquals((long)0L, (long)fiCaApps[i8].getTotalPendingRequestsPerPartition().size());
                    continue;
                }
                Assert.assertEquals((long)1024L, (long)((Resource)fiCaApps[i8].getTotalPendingRequestsPerPartition().get("")).getMemorySize());
            }
            Assert.assertEquals((long)activeQueues, (long)numNotPending);
        } else {
            for (int i9 = 0; i9 < totalApps; ++i9) {
                boolean pending = fiCaApps[i9].getAppSchedulingInfo().isPending();
                if (i9 < activeQueues) {
                    Assert.assertFalse((boolean)pending);
                    Assert.assertEquals((long)0L, (long)fiCaApps[i9].getTotalPendingRequestsPerPartition().size());
                    continue;
                }
                Assert.assertTrue((boolean)pending);
                Assert.assertEquals((long)1024L, (long)((Resource)fiCaApps[i9].getTotalPendingRequestsPerPartition().get("")).getMemorySize());
            }
        }
        rm.close();
        rm.stop();
    }

    @Test(timeout=300000L)
    public void testUserLimitThroughputForTwoResources() throws Exception {
        this.testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100);
    }

    @Test(timeout=300000L)
    public void testUserLimitThroughputForThreeResources() throws Exception {
        this.testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100);
    }

    @Test(timeout=300000L)
    public void testUserLimitThroughputForFourResources() throws Exception {
        this.testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100);
    }

    @Test(timeout=300000L)
    public void testUserLimitThroughputForFiveResources() throws Exception {
        this.testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100);
    }

    @Test(timeout=1800000L)
    public void testUserLimitThroughputWithManyQueues() throws Exception {
        int numQueues = Integer.getInteger("NumberOfQueues", 40);
        int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100);
        int appCount = Integer.getInteger("NumberOfApplications", 100);
        this.testUserLimitThroughputWithNumberOfResourceTypes(2, numQueues, pctActiveQueues, appCount);
    }

    CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues) throws Exception {
        CapacitySchedulerConfiguration csconf = new CapacitySchedulerConfiguration();
        csconf.setResourceComparator(DominantResourceCalculator.class);
        csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
        csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
        csconf.setCapacity("root.default", 0.0f);
        csconf.setOffSwitchPerHeartbeatLimit(numQueues);
        float capacity = 100.0f / (float)numQueues;
        String[] subQueues = new String[numQueues];
        for (int i = 0; i < numQueues; ++i) {
            String queueName = String.format("%03d", i);
            String queuePath = "root." + queueName;
            subQueues[i] = queueName;
            csconf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 100.0f);
            csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f);
            csconf.setCapacity(queuePath, capacity);
            csconf.setUserLimitFactor(queuePath, 100.0f);
            csconf.setMaximumCapacity(queuePath, 100.0f);
        }
        csconf.setQueues("root", subQueues);
        return csconf;
    }

    public static class CapacitySchedulerPerf
    extends CapacityScheduler {
        volatile boolean enable = false;
        AtomicLong count = new AtomicLong(0L);

        CSAssignment allocateContainersToNode(CandidateNodeSet<FiCaSchedulerNode> candidates, boolean withNodeHeartbeat) {
            CSAssignment retVal = super.allocateContainersToNode(candidates, withNodeHeartbeat);
            if (this.enable) {
                this.count.incrementAndGet();
            }
            return retVal;
        }
    }
}

