/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivityNodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAllocationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppRequestAllocationInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class TestActivitiesManager {
    private static final int NUM_NODES = 5;
    private static final int NUM_APPS = 5;
    private static final int NUM_THREADS = 5;
    private RMContext rmContext;
    private TestingActivitiesManager activitiesManager;
    private List<SchedulerApplicationAttempt> apps;
    private List<SchedulerNode> nodes;
    private ThreadPoolExecutor threadPoolExecutor;

    @Before
    public void setup() {
        int i;
        this.rmContext = (RMContext)Mockito.mock(RMContext.class);
        Configuration conf = new Configuration();
        Mockito.when((Object)this.rmContext.getYarnConfiguration()).thenReturn((Object)conf);
        ResourceScheduler scheduler = (ResourceScheduler)Mockito.mock(ResourceScheduler.class);
        Mockito.when((Object)scheduler.getMinimumResourceCapability()).thenReturn((Object)Resources.none());
        Mockito.when((Object)this.rmContext.getScheduler()).thenReturn((Object)scheduler);
        LeafQueue mockQueue = (LeafQueue)Mockito.mock(LeafQueue.class);
        ConcurrentHashMap<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<ApplicationId, RMApp>();
        ((RMContext)Mockito.doReturn(rmApps).when((Object)this.rmContext)).getRMApps();
        this.apps = new ArrayList<SchedulerApplicationAttempt>();
        for (i = 0; i < 5; ++i) {
            ApplicationAttemptId appAttemptId = TestUtils.getMockApplicationAttemptId(i, 0);
            RMApp mockApp = (RMApp)Mockito.mock(RMApp.class);
            ((RMApp)Mockito.doReturn((Object)appAttemptId.getApplicationId()).when((Object)mockApp)).getApplicationId();
            ((RMApp)Mockito.doReturn((Object)FinalApplicationStatus.UNDEFINED).when((Object)mockApp)).getFinalApplicationStatus();
            rmApps.put(appAttemptId.getApplicationId(), mockApp);
            FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", (Queue)mockQueue, (AbstractUsersManager)Mockito.mock(ActiveUsersManager.class), this.rmContext);
            this.apps.add((SchedulerApplicationAttempt)app);
        }
        this.nodes = new ArrayList<SchedulerNode>();
        for (i = 0; i < 5; ++i) {
            this.nodes.add((SchedulerNode)TestUtils.getMockNode("host" + i, "rack", 1, 10240));
        }
        this.activitiesManager = new TestingActivitiesManager(this.rmContext);
        this.threadPoolExecutor = new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Test
    public void testRecordingDifferentNodeActivitiesInMultiThreads() throws Exception {
        Random rand = new Random();
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (SchedulerNode schedulerNode : this.nodes) {
            Callable<Void> task = () -> {
                SchedulerApplicationAttempt randomApp = this.apps.get(rand.nextInt(5));
                this.activitiesManager.recordNextNodeUpdateActivities(node.getNodeID().toString());
                ActivitiesLogger.NODE.startNodeUpdateRecording((ActivitiesManager)this.activitiesManager, (NodeId)node.getNodeID());
                ActivitiesLogger.APP.recordAppActivityWithoutAllocation((ActivitiesManager)this.activitiesManager, (SchedulerNode)node, (SchedulerApplicationAttempt)randomApp, (SchedulerRequestKey)new SchedulerRequestKey(Priority.newInstance((int)0), 0L, null), (String)"Node is blacklisted", (ActivityState)ActivityState.REJECTED, (ActivityLevel)ActivityLevel.NODE);
                ActivitiesLogger.NODE.finishNodeUpdateRecording((ActivitiesManager)this.activitiesManager, (NodeId)node.getNodeID(), (String)"");
                return null;
            };
            futures.add(this.threadPoolExecutor.submit(task));
        }
        for (Future future : futures) {
            future.get();
        }
        Assert.assertEquals((long)5L, (long)this.activitiesManager.historyNodeAllocations.size());
        for (List list : this.activitiesManager.historyNodeAllocations.values()) {
            Assert.assertEquals((long)1L, (long)list.size());
            Assert.assertEquals((long)1L, (long)((List)list.get(0)).size());
        }
    }

    @Test
    public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads() throws Exception {
        Random rand = new Random();
        this.activitiesManager.recordNextNodeUpdateActivities(ActivitiesManager.EMPTY_NODE_ID.toString());
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (SchedulerNode schedulerNode : this.nodes) {
            Callable<Void> task = () -> {
                SchedulerApplicationAttempt randomApp = this.apps.get(rand.nextInt(5));
                ActivitiesLogger.NODE.startNodeUpdateRecording((ActivitiesManager)this.activitiesManager, (NodeId)ActivitiesManager.EMPTY_NODE_ID);
                ActivitiesLogger.APP.recordAppActivityWithoutAllocation((ActivitiesManager)this.activitiesManager, (SchedulerNode)node, (SchedulerApplicationAttempt)randomApp, (SchedulerRequestKey)new SchedulerRequestKey(Priority.newInstance((int)0), 0L, null), (String)"Node is blacklisted", (ActivityState)ActivityState.REJECTED, (ActivityLevel)ActivityLevel.NODE);
                ActivitiesLogger.NODE.finishNodeUpdateRecording((ActivitiesManager)this.activitiesManager, (NodeId)ActivitiesManager.EMPTY_NODE_ID, (String)"");
                return null;
            };
            futures.add(this.threadPoolExecutor.submit(task));
        }
        for (Future future : futures) {
            future.get();
        }
        Assert.assertEquals((long)1L, (long)this.activitiesManager.historyNodeAllocations.size());
    }

    @Test
    public void testRecordingAppActivitiesInMultiThreads() throws Exception {
        Random rand = new Random();
        SchedulerApplicationAttempt randomApp = this.apps.get(rand.nextInt(5));
        this.activitiesManager.turnOnAppActivitiesRecording(randomApp.getApplicationId(), 3.0);
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        int nTasks = 20;
        for (int i = 0; i < nTasks; ++i) {
            Callable<Void> callable = () -> {
                ActivitiesLogger.APP.startAppAllocationRecording((ActivitiesManager)this.activitiesManager, (FiCaSchedulerNode)((FiCaSchedulerNode)this.nodes.get(0)), (long)SystemClock.getInstance().getTime(), (SchedulerApplicationAttempt)randomApp);
                for (SchedulerNode node : this.nodes) {
                    ActivitiesLogger.APP.recordAppActivityWithoutAllocation((ActivitiesManager)this.activitiesManager, (SchedulerNode)node, (SchedulerApplicationAttempt)randomApp, (SchedulerRequestKey)new SchedulerRequestKey(Priority.newInstance((int)0), 0L, null), (String)"Node is blacklisted", (ActivityState)ActivityState.REJECTED, (ActivityLevel)ActivityLevel.NODE);
                }
                ActivitiesLogger.APP.finishSkippedAppAllocationRecording((ActivitiesManager)this.activitiesManager, (ApplicationId)randomApp.getApplicationId(), (ActivityState)ActivityState.SKIPPED, (String)ActivityDiagnosticConstant.EMPTY);
                return null;
            };
            futures.add(this.threadPoolExecutor.submit(callable));
        }
        for (Future future : futures) {
            future.get();
        }
        java.util.Queue appAllocations = (java.util.Queue)this.activitiesManager.completedAppAllocations.get(randomApp.getApplicationId());
        Assert.assertEquals((long)nTasks, (long)appAllocations.size());
        for (AppAllocation aa : appAllocations) {
            Assert.assertEquals((long)5L, (long)aa.getAllocationAttempts().size());
        }
    }

    @Test(timeout=30000L)
    public void testAppActivitiesTTL() throws Exception {
        long cleanupIntervalMs = 100L;
        long appActivitiesTTL = 1000L;
        this.rmContext.getYarnConfiguration().setLong("yarn.resourcemanager.activities-manager.cleanup-interval-ms", cleanupIntervalMs);
        this.rmContext.getYarnConfiguration().setLong("yarn.resourcemanager.activities-manager.app-activities.ttl-ms", appActivitiesTTL);
        ActivitiesManager newActivitiesManager = new ActivitiesManager(this.rmContext);
        newActivitiesManager.serviceStart();
        SchedulerApplicationAttempt app = this.apps.get(0);
        FiCaSchedulerNode node = (FiCaSchedulerNode)this.nodes.get(0);
        newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3.0);
        int numActivities = 10;
        for (int i = 0; i < numActivities; ++i) {
            ActivitiesLogger.APP.startAppAllocationRecording((ActivitiesManager)newActivitiesManager, (FiCaSchedulerNode)node, (long)SystemClock.getInstance().getTime(), (SchedulerApplicationAttempt)app);
            ActivitiesLogger.APP.recordAppActivityWithoutAllocation((ActivitiesManager)newActivitiesManager, (SchedulerNode)node, (SchedulerApplicationAttempt)app, (SchedulerRequestKey)new SchedulerRequestKey(Priority.newInstance((int)0), 0L, null), (String)"Node is blacklisted", (ActivityState)ActivityState.REJECTED, (ActivityLevel)ActivityLevel.NODE);
            ActivitiesLogger.APP.finishSkippedAppAllocationRecording((ActivitiesManager)newActivitiesManager, (ApplicationId)app.getApplicationId(), (ActivityState)ActivityState.SKIPPED, (String)ActivityDiagnosticConstant.EMPTY);
        }
        AppActivitiesInfo appActivitiesInfo = newActivitiesManager.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, false, 3.0);
        Assert.assertEquals((long)numActivities, (long)appActivitiesInfo.getAllocations().size());
        Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
        appActivitiesInfo = newActivitiesManager.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, false, 3.0);
        Assert.assertEquals((long)0L, (long)appActivitiesInfo.getAllocations().size());
    }

    @Test(timeout=30000L)
    public void testAppActivitiesPerformance() {
        SchedulerApplicationAttempt app = this.apps.get(0);
        FiCaSchedulerNode node = (FiCaSchedulerNode)this.nodes.get(0);
        this.activitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 100.0);
        int numActivities = 100;
        int numNodes = 10000;
        int testingTimes = 10;
        for (int ano = 0; ano < numActivities; ++ano) {
            ActivitiesLogger.APP.startAppAllocationRecording((ActivitiesManager)this.activitiesManager, (FiCaSchedulerNode)node, (long)SystemClock.getInstance().getTime(), (SchedulerApplicationAttempt)app);
            for (int i = 0; i < numNodes; ++i) {
                NodeId nodeId = NodeId.newInstance((String)("host" + i), (int)0);
                this.activitiesManager.addSchedulingActivityForApp(app.getApplicationId(), null, 0, ActivityState.SKIPPED, "Node is blacklisted", ActivityLevel.NODE, nodeId, 0L);
            }
            ActivitiesLogger.APP.finishSkippedAppAllocationRecording((ActivitiesManager)this.activitiesManager, (ApplicationId)app.getApplicationId(), (ActivityState)ActivityState.SKIPPED, (String)ActivityDiagnosticConstant.EMPTY);
        }
        this.activitiesManager.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, true, 100.0);
        Supplier<Void> normalSupplier = () -> {
            AppActivitiesInfo appActivitiesInfo = this.activitiesManager.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, false, 100.0);
            Assert.assertEquals((long)numActivities, (long)appActivitiesInfo.getAllocations().size());
            Assert.assertEquals((long)1L, (long)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().size());
            Assert.assertEquals((long)numNodes, (long)((AppRequestAllocationInfo)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().get(0)).getChildren().size());
            return null;
        };
        this.testManyTimes("Getting normal app activities", normalSupplier, testingTimes);
        Supplier<Void> aggregatedSupplier = () -> {
            AppActivitiesInfo appActivitiesInfo = this.activitiesManager.getAppActivitiesInfo(app.getApplicationId(), null, null, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, false, 100.0);
            Assert.assertEquals((long)numActivities, (long)appActivitiesInfo.getAllocations().size());
            Assert.assertEquals((long)1L, (long)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().size());
            Assert.assertEquals((long)1L, (long)((AppRequestAllocationInfo)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().get(0)).getChildren().size());
            Assert.assertEquals((long)numNodes, (long)((ActivityNodeInfo)((AppRequestAllocationInfo)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().get(0)).getChildren().get(0)).getNodeIds().size());
            return null;
        };
        this.testManyTimes("Getting aggregated app activities", aggregatedSupplier, testingTimes);
        Supplier<Void> summarizedSupplier = () -> {
            AppActivitiesInfo appActivitiesInfo = this.activitiesManager.getAppActivitiesInfo(app.getApplicationId(), null, null, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, true, 100.0);
            Assert.assertEquals((long)1L, (long)appActivitiesInfo.getAllocations().size());
            Assert.assertEquals((long)1L, (long)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().size());
            Assert.assertEquals((long)1L, (long)((AppRequestAllocationInfo)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().get(0)).getChildren().size());
            Assert.assertEquals((long)numNodes, (long)((ActivityNodeInfo)((AppRequestAllocationInfo)((AppAllocationInfo)appActivitiesInfo.getAllocations().get(0)).getChildren().get(0)).getChildren().get(0)).getNodeIds().size());
            return null;
        };
        this.testManyTimes("Getting summarized app activities", summarizedSupplier, testingTimes);
    }

    @Test(timeout=10000L)
    public void testAppActivitiesMaxQueueLengthUpdate() throws TimeoutException, InterruptedException {
        Configuration conf = new Configuration();
        int configuredAppActivitiesMaxQueueLength = 1;
        conf.setInt("yarn.resourcemanager.activities-manager.app-activities.max-queue-length", configuredAppActivitiesMaxQueueLength);
        conf.setInt("yarn.resourcemanager.activities-manager.cleanup-interval-ms", 500);
        ConcurrentHashMap<NodeId, Object> mockNodes = new ConcurrentHashMap<NodeId, Object>();
        int numNodes = 5;
        for (int i = 0; i < numNodes; ++i) {
            mockNodes.put(NodeId.newInstance((String)("node" + i), (int)0), Mockito.mock(RMNode.class));
        }
        CapacityScheduler cs = (CapacityScheduler)Mockito.mock(CapacityScheduler.class);
        RMContext mockRMContext = (RMContext)Mockito.mock(RMContext.class);
        Mockito.when((Object)mockRMContext.getRMNodes()).thenReturn(mockNodes);
        Mockito.when((Object)mockRMContext.getYarnConfiguration()).thenReturn((Object)conf);
        Mockito.when((Object)mockRMContext.getScheduler()).thenReturn((Object)cs);
        Mockito.when((Object)cs.isMultiNodePlacementEnabled()).thenReturn((Object)false);
        int numAsyncSchedulerThreads = 3;
        Mockito.when((Object)cs.getNumAsyncSchedulerThreads()).thenReturn((Object)numAsyncSchedulerThreads);
        ActivitiesManager newActivitiesManager = new ActivitiesManager(mockRMContext);
        Assert.assertEquals((long)1L, (long)newActivitiesManager.getAppActivitiesMaxQueueLength());
        newActivitiesManager.init(conf);
        newActivitiesManager.start();
        GenericTestUtils.waitFor(() -> newActivitiesManager.getAppActivitiesMaxQueueLength() == numNodes * numAsyncSchedulerThreads, (long)100L, (long)3000L);
        Assert.assertEquals((long)15L, (long)newActivitiesManager.getAppActivitiesMaxQueueLength());
        Mockito.when((Object)cs.getNumAsyncSchedulerThreads()).thenReturn((Object)0);
        GenericTestUtils.waitFor(() -> (double)newActivitiesManager.getAppActivitiesMaxQueueLength() == (double)numNodes * 1.2, (long)100L, (long)3000L);
        Assert.assertEquals((long)6L, (long)newActivitiesManager.getAppActivitiesMaxQueueLength());
        Mockito.when((Object)cs.isMultiNodePlacementEnabled()).thenReturn((Object)true);
        GenericTestUtils.waitFor(() -> newActivitiesManager.getAppActivitiesMaxQueueLength() == configuredAppActivitiesMaxQueueLength, (long)100L, (long)3000L);
        Assert.assertEquals((long)1L, (long)newActivitiesManager.getAppActivitiesMaxQueueLength());
    }

    private void testManyTimes(String testingName, Supplier<Void> supplier, int testingTimes) {
        long totalTime = 0L;
        for (int i = 0; i < testingTimes; ++i) {
            long startTime = System.currentTimeMillis();
            supplier.get();
            totalTime += System.currentTimeMillis() - startTime;
        }
        System.out.println("#" + testingName + ", testing times : " + testingTimes + ", total cost time : " + totalTime + " ms, average cost time : " + (float)totalTime / (float)testingTimes + " ms.");
    }

    public class TestingActivitiesManager
    extends ActivitiesManager {
        private Map<NodeId, List<List<NodeAllocation>>> historyNodeAllocations;

        public TestingActivitiesManager(RMContext rmContext) {
            super(rmContext);
            this.historyNodeAllocations = new ConcurrentHashMap<NodeId, List<List<NodeAllocation>>>();
            this.completedNodeAllocations = (ConcurrentMap)Mockito.spy(new ConcurrentHashMap());
            ((ConcurrentMap)Mockito.doAnswer(invocationOnMock -> {
                NodeId nodeId = (NodeId)invocationOnMock.getArguments()[0];
                List nodeAllocations = (List)invocationOnMock.getArguments()[1];
                List<List<NodeAllocation>> historyAllocationsForThisNode = this.historyNodeAllocations.get(nodeId);
                if (historyAllocationsForThisNode == null) {
                    historyAllocationsForThisNode = new ArrayList<List<NodeAllocation>>();
                    this.historyNodeAllocations.put(nodeId, historyAllocationsForThisNode);
                }
                historyAllocationsForThisNode.add(nodeAllocations);
                return null;
            }).when((Object)this.completedNodeAllocations)).put(ArgumentMatchers.any(NodeId.class), ArgumentMatchers.any(List.class));
        }
    }
}

