/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FSPreemptionThread
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(FSPreemptionThread.class);
    protected final FSContext context;
    private final FairScheduler scheduler;
    private final long warnTimeBeforeKill;
    private final long delayBeforeNextStarvationCheck;
    private final Timer preemptionTimer;
    private final Lock schedulerReadLock;

    FSPreemptionThread(FairScheduler scheduler) {
        this.setDaemon(true);
        this.setName("FSPreemptionThread");
        this.scheduler = scheduler;
        this.context = scheduler.getContext();
        FairSchedulerConfiguration fsConf = scheduler.getConf();
        this.context.setPreemptionEnabled();
        this.context.setPreemptionUtilizationThreshold(fsConf.getPreemptionUtilizationThreshold());
        this.preemptionTimer = new Timer("Preemption Timer", true);
        this.warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
        long allocDelay = fsConf.isContinuousSchedulingEnabled() ? (long)(10 * fsConf.getContinuousSchedulingSleepMs()) : 4L * scheduler.getNMHeartbeatInterval();
        this.delayBeforeNextStarvationCheck = this.warnTimeBeforeKill + allocDelay + fsConf.getWaitTimeBeforeNextStarvationCheck();
        this.schedulerReadLock = scheduler.getSchedulerReadLock();
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                FSAppAttempt starvedApp = this.context.getStarvedApps().take();
                this.schedulerReadLock.lock();
                try {
                    this.preemptContainers(this.identifyContainersToPreempt(starvedApp));
                }
                finally {
                    this.schedulerReadLock.unlock();
                }
                starvedApp.preemptionTriggered(this.delayBeforeNextStarvationCheck);
            }
            catch (InterruptedException e) {
                LOG.info("Preemption thread interrupted! Exiting.");
                Thread.currentThread().interrupt();
            }
        }
    }

    private List<RMContainer> identifyContainersToPreempt(FSAppAttempt starvedApp) {
        ArrayList<RMContainer> containersToPreempt = new ArrayList<RMContainer>();
        for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
            List<FSSchedulerNode> potentialNodes = this.scheduler.getNodeTracker().getNodesByResourceName(rr.getResourceName());
            for (int i = 0; i < rr.getNumContainers(); ++i) {
                List<RMContainer> containers;
                PreemptableContainers bestContainers = this.getBestPreemptableContainers(rr, potentialNodes);
                if (bestContainers == null || (containers = bestContainers.getAllContainers()).size() <= 0) continue;
                containersToPreempt.addAll(containers);
                this.trackPreemptionsAgainstNode(containers, starvedApp);
                Iterator<RMContainer> iterator = containers.iterator();
                while (iterator.hasNext()) {
                    RMContainer container;
                    FSAppAttempt app = this.scheduler.getSchedulerApp((container = iterator.next()).getApplicationAttemptId());
                    LOG.info("Preempting container " + container + " from queue: " + (app != null ? app.getQueueName() : "unknown"));
                    if (app == null) continue;
                    app.trackContainerForPreemption(container);
                }
            }
        }
        return containersToPreempt;
    }

    private PreemptableContainers identifyContainersToPreemptForOneContainer(List<FSSchedulerNode> potentialNodes, ResourceRequest rr) {
        PreemptableContainers bestContainers = null;
        int maxAMContainers = Integer.MAX_VALUE;
        for (FSSchedulerNode node : potentialNodes) {
            PreemptableContainers preemptableContainers = this.identifyContainersToPreemptOnNode(rr.getCapability(), node, maxAMContainers);
            if (preemptableContainers == null) continue;
            bestContainers = preemptableContainers;
            maxAMContainers = bestContainers.numAMContainers;
            if (maxAMContainers != 0) continue;
            break;
        }
        return bestContainers;
    }

    private PreemptableContainers identifyContainersToPreemptOnNode(Resource request, FSSchedulerNode node, int maxAMContainers) {
        PreemptableContainers preemptableContainers = new PreemptableContainers(maxAMContainers);
        List<RMContainer> containersToCheck = node.getRunningContainersWithAMsAtTheEnd();
        containersToCheck.removeAll(node.getContainersForPreemption());
        Resource potential = Resources.subtractFromNonNegative((Resource)Resources.clone((Resource)node.getUnallocatedResource()), (Resource)node.getTotalReserved());
        for (RMContainer container : containersToCheck) {
            FSAppAttempt app = this.scheduler.getSchedulerApp(container.getApplicationAttemptId());
            if (app == null) {
                LOG.info("Found container " + container + " on node " + node.getNodeName() + "without app, skipping preemption");
                continue;
            }
            ApplicationId appId = app.getApplicationId();
            if (app.canContainerBePreempted(container, preemptableContainers.getResourcesToPreemptForApp(appId))) {
                if (!preemptableContainers.addContainer(container, appId)) {
                    return null;
                }
                Resources.addTo((Resource)potential, (Resource)container.getAllocatedResource());
            }
            if (!Resources.fitsIn((Resource)request, (Resource)potential)) continue;
            return preemptableContainers;
        }
        return null;
    }

    private void trackPreemptionsAgainstNode(List<RMContainer> containers, FSAppAttempt app) {
        FSSchedulerNode node = (FSSchedulerNode)this.scheduler.getNodeTracker().getNode(containers.get(0).getNodeId());
        node.addContainersForPreemption(containers, app);
    }

    private void preemptContainers(List<RMContainer> containers) {
        this.preemptionTimer.schedule((TimerTask)new PreemptContainersTask(containers), this.warnTimeBeforeKill);
    }

    private PreemptableContainers getBestPreemptableContainers(ResourceRequest rr, List<FSSchedulerNode> potentialNodes) {
        PreemptableContainers bestContainers = this.identifyContainersToPreemptForOneContainer(potentialNodes, rr);
        if (rr.getRelaxLocality() && !ResourceRequest.isAnyLocation((String)rr.getResourceName()) && bestContainers != null && bestContainers.numAMContainers > 0) {
            List<FSSchedulerNode> remainingNodes = this.scheduler.getNodeTracker().getAllNodes();
            remainingNodes.removeAll(potentialNodes);
            PreemptableContainers spareContainers = this.identifyContainersToPreemptForOneContainer(remainingNodes, rr);
            if (spareContainers != null && spareContainers.numAMContainers < bestContainers.numAMContainers) {
                bestContainers = spareContainers;
            }
        }
        return bestContainers;
    }

    private static class PreemptableContainers {
        Map<ApplicationId, List<RMContainer>> containersByApp;
        int numAMContainers = 0;
        int maxAMContainers;

        PreemptableContainers(int maxAMContainers) {
            this.maxAMContainers = maxAMContainers;
            this.containersByApp = new HashMap<ApplicationId, List<RMContainer>>();
        }

        private boolean addContainer(RMContainer container, ApplicationId appId) {
            if (container.isAMContainer()) {
                ++this.numAMContainers;
                if (this.numAMContainers >= this.maxAMContainers) {
                    return false;
                }
            }
            if (!this.containersByApp.containsKey(appId)) {
                this.containersByApp.put(appId, new ArrayList());
            }
            this.containersByApp.get(appId).add(container);
            return true;
        }

        private List<RMContainer> getAllContainers() {
            ArrayList<RMContainer> allContainers = new ArrayList<RMContainer>();
            for (List<RMContainer> containersForApp : this.containersByApp.values()) {
                allContainers.addAll(containersForApp);
            }
            return allContainers;
        }

        private Resource getResourcesToPreemptForApp(ApplicationId appId) {
            Resource resourcesToPreempt = Resources.createResource((int)0, (int)0);
            if (this.containersByApp.containsKey(appId)) {
                for (RMContainer container : this.containersByApp.get(appId)) {
                    Resources.addTo((Resource)resourcesToPreempt, (Resource)container.getAllocatedResource());
                }
            }
            return resourcesToPreempt;
        }
    }

    private class PreemptContainersTask
    extends TimerTask {
        private final List<RMContainer> containers;

        PreemptContainersTask(List<RMContainer> containers) {
            this.containers = containers;
        }

        @Override
        public void run() {
            for (RMContainer container : this.containers) {
                ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus(container.getContainerId(), "Container preempted by scheduler");
                LOG.info("Killing container " + container);
                FSPreemptionThread.this.scheduler.completedContainer(container, status, RMContainerEventType.KILL);
            }
        }
    }
}

