/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestCheckpointPreemptionPolicy {
    TaskAttemptListenerImpl pel = null;
    RMContainerAllocator r;
    JobId jid;
    MRAppMaster.RunningAppContext mActxt;
    Set<ContainerId> preemptedContainers = new HashSet<ContainerId>();
    Map<ContainerId, TaskAttemptId> assignedContainers = new HashMap<ContainerId, TaskAttemptId>();
    private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
    HashMap<ContainerId, Resource> contToResourceMap = new HashMap();
    private int minAlloc = 1024;

    @BeforeEach
    public void setup() {
        ApplicationId appId = ApplicationId.newInstance((long)200L, (int)1);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)1);
        this.jid = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        this.mActxt = (MRAppMaster.RunningAppContext)Mockito.mock(MRAppMaster.RunningAppContext.class);
        EventHandler ea = (EventHandler)Mockito.mock(EventHandler.class);
        Mockito.when((Object)this.mActxt.getEventHandler()).thenReturn((Object)ea);
        for (int i = 0; i < 40; ++i) {
            ContainerId cId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)i);
            if (0 == i % 7) {
                this.preemptedContainers.add(cId);
            }
            TaskId tId = 0 == i % 2 ? MRBuilderUtils.newTaskId((JobId)this.jid, (int)(i / 2), (TaskType)TaskType.MAP) : MRBuilderUtils.newTaskId((JobId)this.jid, (int)(i / 2 + 1), (TaskType)TaskType.REDUCE);
            this.assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId((TaskId)tId, (int)0));
            this.contToResourceMap.put(cId, Resource.newInstance((int)(2 * this.minAlloc), (int)2));
        }
        for (Map.Entry<ContainerId, TaskAttemptId> ent : this.assignedContainers.entrySet()) {
            System.out.println("cont:" + ent.getKey().getContainerId() + " type:" + ent.getValue().getTaskId().getTaskType() + " res:" + this.contToResourceMap.get(ent.getKey()).getMemorySize() + "MB");
        }
    }

    @Test
    public void testStrictPreemptionContract() {
        final Map<ContainerId, TaskAttemptId> containers = this.assignedContainers;
        AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context(){

            public TaskAttemptId getTaskAttempt(ContainerId cId) {
                return (TaskAttemptId)containers.get(cId);
            }

            public List<Container> getContainers(TaskType t) {
                ArrayList<Container> p = new ArrayList<Container>();
                for (Map.Entry<ContainerId, TaskAttemptId> ent : TestCheckpointPreemptionPolicy.this.assignedContainers.entrySet()) {
                    if (!ent.getValue().getTaskId().getTaskType().equals((Object)t)) continue;
                    p.add(Container.newInstance((ContainerId)ent.getKey(), null, null, (Resource)TestCheckpointPreemptionPolicy.this.contToResourceMap.get(ent.getKey()), (Priority)Priority.newInstance((int)0), null));
                }
                return p;
            }
        };
        PreemptionMessage pM = this.generatePreemptionMessage(this.preemptedContainers, this.contToResourceMap, Resource.newInstance((int)1024, (int)1), true);
        CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
        policy.init((AppContext)this.mActxt);
        policy.preempt(mPctxt, pM);
        for (ContainerId c : this.preemptedContainers) {
            TaskAttemptId t = this.assignedContainers.get(c);
            if (TaskType.MAP.equals((Object)t.getTaskId().getTaskType()) ? !$assertionsDisabled && policy.isPreempted(t) : !$assertionsDisabled && !policy.isPreempted(t)) {
                throw new AssertionError();
            }
        }
    }

    @Test
    public void testPreemptionContract() {
        final Map<ContainerId, TaskAttemptId> containers = this.assignedContainers;
        AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context(){

            public TaskAttemptId getTaskAttempt(ContainerId cId) {
                return (TaskAttemptId)containers.get(cId);
            }

            public List<Container> getContainers(TaskType t) {
                ArrayList<Container> p = new ArrayList<Container>();
                for (Map.Entry<ContainerId, TaskAttemptId> ent : TestCheckpointPreemptionPolicy.this.assignedContainers.entrySet()) {
                    if (!ent.getValue().getTaskId().getTaskType().equals((Object)t)) continue;
                    p.add(Container.newInstance((ContainerId)ent.getKey(), null, null, (Resource)TestCheckpointPreemptionPolicy.this.contToResourceMap.get(ent.getKey()), (Priority)Priority.newInstance((int)0), null));
                }
                return p;
            }
        };
        PreemptionMessage pM = this.generatePreemptionMessage(this.preemptedContainers, this.contToResourceMap, Resource.newInstance((int)this.minAlloc, (int)1), false);
        CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
        policy.init((AppContext)this.mActxt);
        int supposedMemPreemption = (int)((PreemptionResourceRequest)pM.getContract().getResourceRequest().get(0)).getResourceRequest().getCapability().getMemorySize() * ((PreemptionResourceRequest)pM.getContract().getResourceRequest().get(0)).getResourceRequest().getNumContainers();
        policy.preempt(mPctxt, pM);
        List<TaskAttemptId> preempting = this.validatePreemption(pM, policy, supposedMemPreemption);
        policy.preempt(mPctxt, pM);
        List<TaskAttemptId> preempting2 = this.validatePreemption(pM, policy, supposedMemPreemption);
        assert (preempting2.equals(preempting));
        policy.handleCompletedContainer(preempting.get(0));
        policy.handleCompletedContainer(preempting.get(1));
        Iterator<Map.Entry<ContainerId, TaskAttemptId>> it = this.assignedContainers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<ContainerId, TaskAttemptId> ent = it.next();
            if (!ent.getValue().equals((Object)preempting.get(0)) && !ent.getValue().equals((Object)preempting.get(1))) continue;
            it.remove();
        }
        policy.preempt(mPctxt, pM);
        List<TaskAttemptId> preempting3 = this.validatePreemption(pM, policy, supposedMemPreemption);
        assert (!preempting3.equals(preempting2));
    }

    private List<TaskAttemptId> validatePreemption(PreemptionMessage pM, CheckpointAMPreemptionPolicy policy, int supposedMemPreemption) {
        Resource effectivelyPreempted = Resource.newInstance((int)0, (int)0);
        ArrayList<TaskAttemptId> preempting = new ArrayList<TaskAttemptId>();
        for (Map.Entry<ContainerId, TaskAttemptId> ent : this.assignedContainers.entrySet()) {
            if (!policy.isPreempted(ent.getValue())) continue;
            Resources.addTo((Resource)effectivelyPreempted, (Resource)this.contToResourceMap.get(ent.getKey()));
            if (!policy.isPreempted(ent.getValue())) continue;
            Assertions.assertEquals((Object)TaskType.REDUCE, (Object)ent.getValue().getTaskId().getTaskType());
            preempting.add(ent.getValue());
        }
        assert (effectivelyPreempted.getMemorySize() >= (long)supposedMemPreemption) : " preempted: " + effectivelyPreempted.getMemorySize();
        assert (effectivelyPreempted.getMemorySize() <= (long)(supposedMemPreemption + this.minAlloc));
        return preempting;
    }

    private PreemptionMessage generatePreemptionMessage(Set<ContainerId> containerToPreempt, HashMap<ContainerId, Resource> resPerCont, Resource minimumAllocation, boolean strict) {
        Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(new HashSet<ContainerId>(containerToPreempt));
        containerToPreempt.clear();
        Resource tot = Resource.newInstance((int)0, (int)0);
        for (ContainerId c : currentContPreemption) {
            Resources.addTo((Resource)tot, (Resource)resPerCont.get(c));
        }
        int numCont = (int)Math.ceil((double)tot.getMemorySize() / (double)minimumAllocation.getMemorySize());
        ResourceRequest rr = ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)minimumAllocation, (int)numCont);
        if (strict) {
            return this.generatePreemptionMessage(new Allocation(null, null, currentContPreemption, null, null));
        }
        return this.generatePreemptionMessage(new Allocation(null, null, null, currentContPreemption, Collections.singletonList(rr)));
    }

    private PreemptionMessage generatePreemptionMessage(Allocation allocation) {
        PreemptionContainer pc;
        HashSet<PreemptionContainer> pCont;
        PreemptionMessage pMsg = null;
        if (allocation.getStrictContainerPreemptions() != null) {
            pMsg = (PreemptionMessage)this.recordFactory.newRecordInstance(PreemptionMessage.class);
            StrictPreemptionContract pStrict = (StrictPreemptionContract)this.recordFactory.newRecordInstance(StrictPreemptionContract.class);
            pCont = new HashSet<PreemptionContainer>();
            for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
                pc = (PreemptionContainer)this.recordFactory.newRecordInstance(PreemptionContainer.class);
                pc.setId(cId);
                pCont.add(pc);
            }
            pStrict.setContainers(pCont);
            pMsg.setStrictContract(pStrict);
        }
        if (allocation.getResourcePreemptions() != null && allocation.getResourcePreemptions().size() > 0 && allocation.getContainerPreemptions() != null && allocation.getContainerPreemptions().size() > 0) {
            if (pMsg == null) {
                pMsg = (PreemptionMessage)this.recordFactory.newRecordInstance(PreemptionMessage.class);
            }
            PreemptionContract contract = (PreemptionContract)this.recordFactory.newRecordInstance(PreemptionContract.class);
            pCont = new HashSet();
            for (ContainerId cId : allocation.getContainerPreemptions()) {
                pc = (PreemptionContainer)this.recordFactory.newRecordInstance(PreemptionContainer.class);
                pc.setId(cId);
                pCont.add(pc);
            }
            ArrayList<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
            for (ResourceRequest crr : allocation.getResourcePreemptions()) {
                PreemptionResourceRequest prr = (PreemptionResourceRequest)this.recordFactory.newRecordInstance(PreemptionResourceRequest.class);
                prr.setResourceRequest(crr);
                pRes.add(prr);
            }
            contract.setContainers(pCont);
            contract.setResourceRequest(pRes);
            pMsg.setContract(contract);
        }
        return pMsg;
    }
}

