/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tez.common.counters.AggregateTezCounterDelegate;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.MockTezClient;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class TestMockDAGAppMaster {
    private static final Log LOG = LogFactory.getLog(TestMockDAGAppMaster.class);
    static Configuration defaultConf;
    static FileSystem localFs;

    @Test(timeout=5000L)
    public void testLocalResourceSetup() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        HashMap lrDAG = Maps.newHashMap();
        String lrName1 = "LR1";
        lrDAG.put(lrName1, LocalResource.newInstance((URL)URL.newInstance((String)"file", (String)"localhost", (int)0, (String)"/test"), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.PUBLIC, (long)1L, (long)1L));
        HashMap lrVertex = Maps.newHashMap();
        String lrName2 = "LR2";
        lrVertex.put(lrName2, LocalResource.newInstance((URL)URL.newInstance((String)"file", (String)"localhost", (int)0, (String)"/test1"), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.PUBLIC, (long)1L, (long)1L));
        DAG dag = DAG.create((String)"testLocalResourceSetup").addTaskLocalFiles((Map)lrDAG);
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5).addTaskLocalFiles((Map)lrVertex);
        dag.addVertex(vA);
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        MockDAGAppMaster.MockContainerLauncher.ContainerData cData = mockLauncher.getContainers().values().iterator().next();
        ContainerLaunchContext launchContext = cData.launchContext;
        Map taskLR = launchContext.getLocalResources();
        Assert.assertTrue((boolean)taskLR.containsKey(lrName1));
        Assert.assertTrue((boolean)taskLR.containsKey(lrName2));
        mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        tezClient.stop();
    }

    @Test(timeout=5000L)
    public void testInternalPreemption() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        DAG dag = DAG.create((String)"testInternalPreemption");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        dag.addVertex(vA);
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        MockDAGAppMaster.MockContainerLauncher.ContainerData cData = mockLauncher.getContainers().values().iterator().next();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockApp.getTaskSchedulerManager().preemptContainer(0, cData.cId);
        mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)0);
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
        Assert.assertTrue((killedTa.getState().equals((Object)TaskAttemptState.KILLED) || killedTa.getState().equals((Object)TaskAttemptState.FAILED) ? 1 : 0) != 0);
        tezClient.stop();
    }

    @Test(timeout=5000L)
    public void testBasicEvents() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.eventsDelegate = new TestEventsDelegate();
        DAG dag = DAG.create((String)"testBasicEvents");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)2);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)2);
        org.apache.tez.dag.api.Vertex vC = org.apache.tez.dag.api.Vertex.create((String)"C", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)2);
        org.apache.tez.dag.api.Vertex vD = org.apache.tez.dag.api.Vertex.create((String)"D", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)2);
        dag.addVertex(vA).addVertex(vB).addVertex(vC).addVertex(vD).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In")))).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vC, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In")))).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vD, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.ONE_TO_ONE, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        VertexImpl vImpl = (VertexImpl)dagImpl.getVertex(vB.getName());
        TaskImpl tImpl = (TaskImpl)vImpl.getTask(1);
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)tImpl.getTaskID(), (int)0);
        List tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
        Assert.assertEquals((long)2L, (long)tEvents.size());
        Assert.assertEquals((Object)vA.getName(), (Object)((TezEvent)tEvents.get(0)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals((long)0L, (long)((DataMovementEvent)((TezEvent)tEvents.get(0)).getEvent()).getSourceIndex());
        Assert.assertEquals((Object)vA.getName(), (Object)((TezEvent)tEvents.get(1)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals((long)0L, (long)((DataMovementEvent)((TezEvent)tEvents.get(1)).getEvent()).getSourceIndex());
        int targetIndex1 = ((DataMovementEvent)((TezEvent)tEvents.get(0)).getEvent()).getTargetIndex();
        int targetIndex2 = ((DataMovementEvent)((TezEvent)tEvents.get(1)).getEvent()).getTargetIndex();
        Assert.assertTrue((String)("t1: " + targetIndex1 + " t2: " + targetIndex2), (targetIndex1 == 0 && targetIndex2 == 1 || targetIndex1 == 1 && targetIndex2 == 0 ? 1 : 0) != 0);
        vImpl = (VertexImpl)dagImpl.getVertex(vC.getName());
        tImpl = (TaskImpl)vImpl.getTask(1);
        taId = TezTaskAttemptID.getInstance((TezTaskID)tImpl.getTaskID(), (int)0);
        tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
        Assert.assertEquals((long)2L, (long)tEvents.size());
        Assert.assertEquals((Object)vA.getName(), (Object)((TezEvent)tEvents.get(0)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals((long)1L, (long)((CompositeRoutedDataMovementEvent)((TezEvent)tEvents.get(0)).getEvent()).getSourceIndex());
        Assert.assertEquals((Object)vA.getName(), (Object)((TezEvent)tEvents.get(1)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals((long)1L, (long)((CompositeRoutedDataMovementEvent)((TezEvent)tEvents.get(1)).getEvent()).getSourceIndex());
        targetIndex1 = ((CompositeRoutedDataMovementEvent)((TezEvent)tEvents.get(0)).getEvent()).getTargetIndex();
        targetIndex2 = ((CompositeRoutedDataMovementEvent)((TezEvent)tEvents.get(1)).getEvent()).getTargetIndex();
        Assert.assertTrue((String)("t1: " + targetIndex1 + " t2: " + targetIndex2), (targetIndex1 == 0 && targetIndex2 == 1 || targetIndex1 == 1 && targetIndex2 == 0 ? 1 : 0) != 0);
        vImpl = (VertexImpl)dagImpl.getVertex(vD.getName());
        tImpl = (TaskImpl)vImpl.getTask(1);
        taId = TezTaskAttemptID.getInstance((TezTaskID)tImpl.getTaskID(), (int)0);
        tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
        Assert.assertEquals((long)1L, (long)tEvents.size());
        Assert.assertEquals((Object)vA.getName(), (Object)((TezEvent)tEvents.get(0)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals((long)0L, (long)((DataMovementEvent)((TezEvent)tEvents.get(0)).getEvent()).getTargetIndex());
        Assert.assertEquals((long)0L, (long)((DataMovementEvent)((TezEvent)tEvents.get(0)).getEvent()).getSourceIndex());
        tezClient.stop();
    }

    @Test(timeout=100000L)
    public void testMixedEdgeRouting() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.eventsDelegate = new TestEventsDelegate();
        DAG dag = DAG.create((String)"testMixedEdgeRouting");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        org.apache.tez.dag.api.Vertex vC = org.apache.tez.dag.api.Vertex.create((String)"C", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        org.apache.tez.dag.api.Vertex vD = org.apache.tez.dag.api.Vertex.create((String)"D", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        org.apache.tez.dag.api.Vertex vE = org.apache.tez.dag.api.Vertex.create((String)"E", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        dag.addVertex(vA).addVertex(vB).addVertex(vC).addVertex(vD).addVertex(vE).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vC, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In")))).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vB, (org.apache.tez.dag.api.Vertex)vC, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In")))).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vD, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In")))).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vB, (org.apache.tez.dag.api.Vertex)vD, (EdgeProperty)EdgeProperty.create((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)LegacyEdgeTestEdgeManager.class.getName()), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In")))).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vB, (org.apache.tez.dag.api.Vertex)vE, (EdgeProperty)EdgeProperty.create((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)LegacyEdgeTestEdgeManager.class.getName()), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        VertexImpl vImpl = (VertexImpl)dagImpl.getVertex(vC.getName());
        TaskImpl tImpl = (TaskImpl)vImpl.getTask(0);
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)tImpl.getTaskID(), (int)0);
        Assert.assertEquals((long)0L, (long)tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
        vImpl = (VertexImpl)dagImpl.getVertex(vD.getName());
        tImpl = (TaskImpl)vImpl.getTask(0);
        taId = TezTaskAttemptID.getInstance((TezTaskID)tImpl.getTaskID(), (int)0);
        Assert.assertEquals((long)1L, (long)tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
        vImpl = (VertexImpl)dagImpl.getVertex(vE.getName());
        tImpl = (TaskImpl)vImpl.getTask(0);
        taId = TezTaskAttemptID.getInstance((TezTaskID)tImpl.getTaskID(), (int)0);
        Assert.assertEquals((long)1L, (long)tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
        tezClient.stop();
    }

    @Test(timeout=100000L)
    public void testConcurrencyLimit() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        int concurrencyLimit = 5;
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false, 20, 1000);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        final AtomicInteger concurrency = new AtomicInteger(0);
        final AtomicBoolean exceededConcurrency = new AtomicBoolean(false);
        mockApp.containerDelegate = new MockDAGAppMaster.ContainerDelegate(){

            @Override
            public void stop(ContainerStopRequest event) {
                concurrency.decrementAndGet();
            }

            @Override
            public void launch(ContainerLaunchRequest event) {
                int maxConc = concurrency.incrementAndGet();
                if (maxConc > 5) {
                    exceededConcurrency.set(true);
                }
                System.out.println("Launched: " + maxConc);
            }
        };
        DAG dag = DAG.create((String)"testConcurrencyLimit");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)20).setConf("tez.am.vertex.max-task-concurrency", String.valueOf(5));
        dag.addVertex(vA);
        mockLauncher.startScheduling(true);
        DAGClient dagClient = tezClient.submitDAG(dag);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        Assert.assertFalse((boolean)exceededConcurrency.get());
        tezClient.stop();
    }

    @Test
    public void testCountersAggregation() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false);
        tezClient.start();
        String vAName = "A";
        String vBName = "B";
        String procCounterName = "Proc";
        String globalCounterName = "Global";
        DAG dag = DAG.create((String)"testCountersAggregation");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        dag.addVertex(vA).addVertex(vB).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        TezCounters temp = new TezCounters();
        temp.findCounter(new String("Global"), new String("Global")).increment(1L);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(bos);
        temp.write((DataOutput)out);
        final byte[] payload = bos.toByteArray();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.countersDelegate = new MockDAGAppMaster.CountersDelegate(){
            int counterValue = 0;

            @Override
            public TezCounters getCounters(TaskSpec taskSpec) {
                String vName = taskSpec.getVertexName();
                TezCounters counters = new TezCounters();
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{ByteBuffer.wrap(payload)});
                try {
                    counters.readFields((DataInput)in);
                }
                catch (IOException e) {
                    Assert.fail((String)e.getMessage());
                }
                counters.findCounter(vName, "Proc").setValue((long)(++this.counterValue));
                for (OutputSpec output : taskSpec.getOutputs()) {
                    counters.findCounter(vName, output.getDestinationVertexName()).setValue((long)(++this.counterValue));
                }
                for (InputSpec input : taskSpec.getInputs()) {
                    counters.findCounter(vName, input.getSourceVertexName()).setValue((long)(++this.counterValue));
                }
                return counters;
            }
        };
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        TezCounters counters = dagImpl.getAllCounters();
        VertexImpl vAImpl = (VertexImpl)dagImpl.getVertex("A");
        VertexImpl vBImpl = (VertexImpl)dagImpl.getVertex("B");
        TezCounters vACounters = vAImpl.getAllCounters();
        TezCounters vBCounters = vBImpl.getAllCounters();
        Assert.assertEquals((long)19L, (long)((AggregateTezCounterDelegate)vACounters.findCounter("A", "Proc")).getMax());
        Assert.assertEquals((long)1L, (long)((AggregateTezCounterDelegate)vACounters.findCounter("A", "Proc")).getMin());
        Assert.assertEquals((long)20L, (long)((AggregateTezCounterDelegate)vACounters.findCounter("A", "B")).getMax());
        Assert.assertEquals((long)2L, (long)((AggregateTezCounterDelegate)vACounters.findCounter("A", "B")).getMin());
        Assert.assertEquals((long)21L, (long)((AggregateTezCounterDelegate)vBCounters.findCounter("B", "Proc")).getMin());
        Assert.assertEquals((long)21L, (long)((AggregateTezCounterDelegate)vBCounters.findCounter("B", "Proc")).getMax());
        Assert.assertEquals((long)22L, (long)((AggregateTezCounterDelegate)vBCounters.findCounter("B", "A")).getMin());
        Assert.assertEquals((long)22L, (long)((AggregateTezCounterDelegate)vBCounters.findCounter("B", "A")).getMax());
        tezClient.stop();
    }

    @Test(timeout=10000L)
    public void testBasicCounters() throws Exception {
        CounterGroup vBGroup;
        String vBGrouName;
        CounterGroup vaGroup;
        String vaGrouName;
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false);
        tezClient.start();
        String vAName = "A";
        String vBName = "B";
        String procCounterName = "Proc";
        String globalCounterName = "Global";
        DAG dag = DAG.create((String)"testBasicCounters");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
        dag.addVertex(vA).addVertex(vB).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        TezCounters temp = new TezCounters();
        temp.findCounter(new String("Global"), new String("Global")).increment(1L);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(bos);
        temp.write((DataOutput)out);
        final byte[] payload = bos.toByteArray();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.countersDelegate = new MockDAGAppMaster.CountersDelegate(){

            @Override
            public TezCounters getCounters(TaskSpec taskSpec) {
                String vName = taskSpec.getVertexName();
                TezCounters counters = new TezCounters();
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{ByteBuffer.wrap(payload)});
                try {
                    counters.readFields((DataInput)in);
                }
                catch (IOException e) {
                    Assert.fail((String)e.getMessage());
                }
                counters.findCounter(vName, "Proc").increment(1L);
                for (OutputSpec output : taskSpec.getOutputs()) {
                    counters.findCounter(vName, output.getDestinationVertexName()).increment(1L);
                }
                for (InputSpec input : taskSpec.getInputs()) {
                    counters.findCounter(vName, input.getSourceVertexName()).increment(1L);
                }
                return counters;
            }
        };
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        TezCounters counters = dagImpl.getAllCounters();
        String osName = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
        if (SystemUtils.IS_OS_LINUX) {
            Assert.assertTrue((counters.findCounter((Enum)DAGCounter.AM_CPU_MILLISECONDS).getValue() > 0L ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)10L, (long)counters.findCounter("A", "Proc").getValue());
        Assert.assertEquals((long)1L, (long)counters.findCounter("B", "Proc").getValue());
        Assert.assertEquals((long)10L, (long)counters.findCounter("A", "B").getValue());
        Assert.assertEquals((long)1L, (long)counters.findCounter("B", "A").getValue());
        Assert.assertEquals((long)11L, (long)counters.findCounter("Global", "Global").getValue());
        VertexImpl vAImpl = (VertexImpl)dagImpl.getVertex("A");
        VertexImpl vBImpl = (VertexImpl)dagImpl.getVertex("B");
        TezCounters vACounters = vAImpl.getAllCounters();
        TezCounters vBCounters = vBImpl.getAllCounters();
        String vACounterName = vACounters.findCounter("Global", "Global").getName();
        String vBCounterName = vBCounters.findCounter("Global", "Global").getName();
        if (vACounterName != vBCounterName) {
            Assert.fail((String)"String counter name objects dont match despite interning.");
        }
        if ((vaGrouName = (vaGroup = (CounterGroup)vACounters.getGroup("Global")).getName()) != (vBGrouName = (vBGroup = (CounterGroup)vBCounters.getGroup("Global")).getName())) {
            Assert.fail((String)"String group name objects dont match despite interning.");
        }
        tezClient.stop();
    }

    @Test(timeout=10000L)
    public void testBasicStatistics() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false);
        tezClient.start();
        String vAName = "A";
        String vBName = "B";
        String sourceName = "In";
        String sinkName = "Out";
        DAG dag = DAG.create((String)"testBasisStatistics");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)3);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)2);
        vA.addDataSource("In", DataSourceDescriptor.create((InputDescriptor)InputDescriptor.create((String)"In"), null, null));
        vB.addDataSink("Out", DataSinkDescriptor.create((OutputDescriptor)OutputDescriptor.create((String)"Out"), null, null));
        dag.addVertex(vA).addVertex(vB).addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        IOStatistics ioStats = new IOStatistics();
        ioStats.setDataSize(1L);
        ioStats.setItemsProcessed(1L);
        TaskStatistics vAStats = new TaskStatistics();
        vAStats.addIO("B", ioStats);
        vAStats.addIO("In", ioStats);
        TaskStatistics vBStats = new TaskStatistics();
        vBStats.addIO("A", ioStats);
        vBStats.addIO("Out", ioStats);
        ByteArrayOutputStream bosA = new ByteArrayOutputStream();
        DataOutputStream outA = new DataOutputStream(bosA);
        vAStats.write((DataOutput)outA);
        final byte[] payloadA = bosA.toByteArray();
        ByteArrayOutputStream bosB = new ByteArrayOutputStream();
        DataOutputStream outB = new DataOutputStream(bosB);
        vBStats.write((DataOutput)outB);
        final byte[] payloadB = bosB.toByteArray();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.statsDelegate = new MockDAGAppMaster.StatisticsDelegate(){

            @Override
            public TaskStatistics getStatistics(TaskSpec taskSpec) {
                byte[] payload = payloadA;
                TaskStatistics stats = new TaskStatistics();
                if (taskSpec.getVertexName().equals("B")) {
                    payload = payloadB;
                }
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{ByteBuffer.wrap(payload)});
                try {
                    stats.readFields((DataInput)in);
                }
                catch (IOException e) {
                    Assert.fail((String)e.getMessage());
                }
                return stats;
            }
        };
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        for (Vertex v : dagImpl.getVertices().values()) {
            VertexStatistics vStats = v.getStatistics();
            if (v.getName().equals("A")) {
                Assert.assertEquals((long)3L, (long)vStats.getOutputStatistics("B").getDataSize());
                Assert.assertEquals((long)3L, (long)vStats.getInputStatistics("In").getDataSize());
                Assert.assertEquals((long)3L, (long)vStats.getOutputStatistics("B").getItemsProcessed());
                Assert.assertEquals((long)3L, (long)vStats.getInputStatistics("In").getItemsProcessed());
                continue;
            }
            Assert.assertEquals((long)2L, (long)vStats.getInputStatistics("A").getDataSize());
            Assert.assertEquals((long)2L, (long)vStats.getOutputStatistics("Out").getDataSize());
            Assert.assertEquals((long)2L, (long)vStats.getInputStatistics("A").getItemsProcessed());
            Assert.assertEquals((long)2L, (long)vStats.getOutputStatistics("Out").getItemsProcessed());
        }
        tezClient.stop();
    }

    private void checkMemory(String name, MockDAGAppMaster mockApp) {
        long mb = 0x100000L;
        Runtime runtime = Runtime.getRuntime();
        System.out.println("##### Heap utilization statistics [MB] for " + name);
        runtime.gc();
        System.out.println("##### Used Memory:" + (runtime.totalMemory() - runtime.freeMemory()) / mb);
        System.out.println("##### Free Memory:" + runtime.freeMemory() / mb);
        System.out.println("##### Total Memory:" + runtime.totalMemory() / mb);
        System.out.println("##### Max Memory:" + runtime.maxMemory() / mb);
    }

    @Ignore
    @Test(timeout=60000L)
    public void testBasicCounterMemory() throws Exception {
        Logger.getRootLogger().setLevel(Level.WARN);
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false);
        tezClient.start();
        String vAName = "A";
        DAG dag = DAG.create((String)"testBasicCounterMemory");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        dag.addVertex(vA);
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.countersDelegate = new MockDAGAppMaster.CountersDelegate(){

            @Override
            public TezCounters getCounters(TaskSpec taskSpec) {
                TezCounters counters = new TezCounters();
                String longName = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
                String shortName = "abcdefghijklmnopqrstuvwxyz";
                for (int i = 0; i < 6; ++i) {
                    for (int j = 0; j < 15; ++j) {
                        counters.findCounter(i + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz", i + "abcdefghijklmnopqrstuvwxyz").increment(1L);
                    }
                }
                return counters;
            }
        };
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        TezCounters counters = dagImpl.getAllCounters();
        Assert.assertNotNull((Object)counters);
        this.checkMemory(dag.getName(), mockApp);
        tezClient.stop();
    }

    @Ignore
    @Test(timeout=60000L)
    public void testTaskEventsProcessingSpeed() throws Exception {
        Logger.getRootLogger().setLevel(Level.WARN);
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        tezconf.setBoolean("tez.am.use.concurrent-dispatcher", true);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false, 30, 1000);
        tezClient.start();
        String vAName = "A";
        DAG dag = DAG.create((String)"testTaskEventsProcessingSpeed");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)50000);
        dag.addVertex(vA);
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        tezClient.stop();
    }

    @Ignore
    @Test(timeout=60000L)
    public void testBasicStatisticsMemory() throws Exception {
        Logger.getRootLogger().setLevel(Level.WARN);
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false);
        tezClient.start();
        String vAName = "abcdefghijklmnopqrstuvwxyz";
        int numTasks = 10000;
        int numSources = 10;
        IOStatistics ioStats = new IOStatistics();
        ioStats.setDataSize(1L);
        ioStats.setItemsProcessed(1L);
        TaskStatistics vAStats = new TaskStatistics();
        DAG dag = DAG.create((String)"testBasicStatisticsMemory");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"abcdefghijklmnopqrstuvwxyz", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)numTasks);
        for (int i = 0; i < numSources; ++i) {
            String sourceName = i + "abcdefghijklmnopqrstuvwxyz";
            vA.addDataSource(sourceName, DataSourceDescriptor.create((InputDescriptor)InputDescriptor.create((String)sourceName), null, null));
            vAStats.addIO(sourceName, ioStats);
        }
        dag.addVertex(vA);
        ByteArrayOutputStream bosA = new ByteArrayOutputStream();
        DataOutputStream outA = new DataOutputStream(bosA);
        vAStats.write((DataOutput)outA);
        final byte[] payloadA = bosA.toByteArray();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.statsDelegate = new MockDAGAppMaster.StatisticsDelegate(){

            @Override
            public TaskStatistics getStatistics(TaskSpec taskSpec) {
                byte[] payload = payloadA;
                TaskStatistics stats = new TaskStatistics();
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{ByteBuffer.wrap(payload)});
                try {
                    stats.readFields((DataInput)in);
                }
                catch (IOException e) {
                    Assert.fail((String)e.getMessage());
                }
                return stats;
            }
        };
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        DAGImpl dagImpl = (DAGImpl)mockApp.getContext().getCurrentDAG();
        mockLauncher.startScheduling(true);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        Assert.assertEquals((long)numTasks, (long)dagImpl.getVertex("abcdefghijklmnopqrstuvwxyz").getStatistics().getInputStatistics("0abcdefghijklmnopqrstuvwxyz").getDataSize());
        Assert.assertEquals((long)numTasks, (long)dagImpl.getVertex("abcdefghijklmnopqrstuvwxyz").getStatistics().getInputStatistics("0abcdefghijklmnopqrstuvwxyz").getItemsProcessed());
        this.checkMemory(dag.getName(), mockApp);
        tezClient.stop();
    }

    @Test(timeout=10000L)
    public void testMultipleSubmissions() throws Exception {
        HashMap lrDAG = Maps.newHashMap();
        String lrName1 = "LR1";
        lrDAG.put(lrName1, LocalResource.newInstance((URL)URL.newInstance((String)"file", (String)"localhost", (int)0, (String)"/test"), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.PUBLIC, (long)1L, (long)1L));
        HashMap lrVertex = Maps.newHashMap();
        String lrName2 = "LR2";
        lrVertex.put(lrName2, LocalResource.newInstance((URL)URL.newInstance((String)"file", (String)"localhost", (int)0, (String)"/test1"), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.PUBLIC, (long)1L, (long)1L));
        DAG dag = DAG.create((String)"DAG-testMultipleSubmissions").addTaskLocalFiles((Map)lrDAG);
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5).addTaskLocalFiles((Map)lrVertex);
        dag.addVertex(vA);
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        DAGClient dagClient = tezClient.submitDAG(dag);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        tezClient.stop();
        tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        dagClient = tezClient.submitDAG(dag);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        tezClient.stop();
    }

    @Test(timeout=10000L)
    public void testSchedulerErrorHandling() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        DAG dag = DAG.create((String)"testSchedulerErrorHandling");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        dag.addVertex(vA);
        tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        mockApp.handle((DAGAppMasterEvent)new DAGAppMasterEventSchedulingServiceError(org.apache.hadoop.util.StringUtils.stringifyException((Throwable)new RuntimeException("Mock error"))));
        while (!mockApp.getShutdownHandler().wasShutdownInvoked()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)mockApp.getContext().getCurrentDAG().getState());
    }

    @Test(timeout=10000L)
    public void testInitFailed() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, new AtomicBoolean(false), true, false);
        try {
            tezClient.start();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.assertEquals((Object)"FailInit", (Object)e.getCause().getCause().getCause().getMessage());
            MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
            mockApp.waitForServiceToStop(Integer.MAX_VALUE);
        }
    }

    @Test(timeout=10000L)
    public void testStartFailed() {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, new AtomicBoolean(false), false, true);
        try {
            tezClient.start();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.assertEquals((Object)"FailStart", (Object)e.getCause().getCause().getCause().getMessage());
            MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
            mockApp.waitForServiceToStop(Integer.MAX_VALUE);
        }
    }

    private OutputCommitterDescriptor createOutputCommitterDesc(boolean failOnCommit) {
        OutputCommitterDescriptor outputCommitterDesc = OutputCommitterDescriptor.create((String)FailingOutputCommitter.class.getName());
        UserPayload payload = UserPayload.create((ByteBuffer)ByteBuffer.wrap(new FailingOutputCommitter.FailingOutputCommitterConfig(failOnCommit).toUserPayload()));
        outputCommitterDesc.setUserPayload(payload);
        return outputCommitterDesc;
    }

    private DAG createDAG(String dagName, boolean uv12CommitFail, boolean v3CommitFail) {
        DAG dag = DAG.create((String)dagName);
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"v1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc"), (int)1);
        org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create((String)"v2", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc"), (int)1);
        org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create((String)"v3", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc"), (int)1);
        VertexGroup uv12 = dag.createVertexGroup("uv12", new org.apache.tez.dag.api.Vertex[]{v1, v2});
        DataSinkDescriptor uv12DataSink = DataSinkDescriptor.create((OutputDescriptor)OutputDescriptor.create((String)"dummy output"), (OutputCommitterDescriptor)this.createOutputCommitterDesc(uv12CommitFail), null);
        uv12.addDataSink("uv12Out", uv12DataSink);
        DataSinkDescriptor v3DataSink = DataSinkDescriptor.create((OutputDescriptor)OutputDescriptor.create((String)"dummy output"), (OutputCommitterDescriptor)this.createOutputCommitterDesc(v3CommitFail), null);
        v3.addDataSink("v3Out", v3DataSink);
        GroupInputEdge e1 = GroupInputEdge.create((VertexGroup)uv12, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"dummy output class"), (InputDescriptor)InputDescriptor.create((String)"dummy input class")), (InputDescriptor)InputDescriptor.create((String)"merge.class"));
        dag.addVertex(v1).addVertex(v2).addVertex(v3).addEdge(e1);
        return dag;
    }

    @Test(timeout=60000L)
    public void testCommitOutputOnDAGSuccess() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        DAG dag1 = this.createDAG("testDAGBothCommitsSucceed", false, false);
        DAGClient dagClient = tezClient.submitDAG(dag1);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        DAG dag2 = this.createDAG("testDAGVertexGroupCommitFail", true, false);
        dagClient = tezClient.submitDAG(dag2);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        LOG.info((Object)dagClient.getDAGStatus(null).getDiagnostics());
        Assert.assertTrue((boolean)StringUtils.join((Collection)dagClient.getDAGStatus(null).getDiagnostics(), (String)",").contains("fail output committer:uv12Out"));
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v1", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v2", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v3", null).getState());
        DAG dag3 = this.createDAG("testDAGVertexCommitFail", false, true);
        dagClient = tezClient.submitDAG(dag3);
        dagClient.waitForCompletion();
        LOG.info((Object)dagClient.getDAGStatus(null).getDiagnostics());
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        Assert.assertTrue((boolean)StringUtils.join((Collection)dagClient.getDAGStatus(null).getDiagnostics(), (String)",").contains("fail output committer:v3Out"));
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v1", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v2", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v3", null).getState());
        DAG dag4 = this.createDAG("testDAGBothCommitsFail", true, true);
        dagClient = tezClient.submitDAG(dag4);
        dagClient.waitForCompletion();
        LOG.info((Object)dagClient.getDAGStatus(null).getDiagnostics());
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        String diag = StringUtils.join((Collection)dagClient.getDAGStatus(null).getDiagnostics(), (String)",");
        Assert.assertTrue((diag.contains("fail output committer:uv12Out") || diag.contains("fail output committer:v3Out") ? 1 : 0) != 0);
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v1", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v2", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v3", null).getState());
        tezClient.stop();
    }

    @Test(timeout=60000L)
    public void testCommitOutputOnVertexSuccess() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        tezconf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        DAG dag1 = this.createDAG("testDAGBothCommitsSucceed", false, false);
        DAGClient dagClient = tezClient.submitDAG(dag1);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        DAG dag2 = this.createDAG("testDAGVertexGroupCommitFail", true, false);
        dagClient = tezClient.submitDAG(dag2);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        LOG.info((Object)dagClient.getDAGStatus(null).getDiagnostics());
        Assert.assertTrue((boolean)StringUtils.join((Collection)dagClient.getDAGStatus(null).getDiagnostics(), (String)",").contains("fail output committer:uv12Out"));
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v1", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v2", null).getState());
        VertexStatus.State v3State = dagClient.getVertexStatus("v3", null).getState();
        if (v3State.equals((Object)VertexStatus.State.SUCCEEDED)) {
            LOG.info((Object)"v3 is succeeded");
        } else {
            Assert.assertEquals((Object)VertexStatus.State.KILLED, (Object)v3State);
        }
        DAG dag3 = this.createDAG("testDAGVertexCommitFail", false, true);
        dagClient = tezClient.submitDAG(dag3);
        dagClient.waitForCompletion();
        LOG.info((Object)dagClient.getDAGStatus(null).getDiagnostics());
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        Assert.assertTrue((boolean)StringUtils.join((Collection)dagClient.getDAGStatus(null).getDiagnostics(), (String)",").contains("Commit failed"));
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v1", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v2", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.FAILED, (Object)dagClient.getVertexStatus("v3", null).getState());
        Assert.assertTrue((boolean)StringUtils.join((Collection)dagClient.getVertexStatus("v3", null).getDiagnostics(), (String)",").contains("fail output committer:v3Out"));
        DAG dag4 = this.createDAG("testDAGBothCommitsFail", true, true);
        dagClient = tezClient.submitDAG(dag4);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        LOG.info((Object)dagClient.getDAGStatus(null).getDiagnostics());
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
        String diag = StringUtils.join((Collection)dagClient.getDAGStatus(null).getDiagnostics(), (String)",");
        Assert.assertTrue((diag.contains("fail output committer:uv12Out") || diag.contains("fail output committer:v3Out") ? 1 : 0) != 0);
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v1", null).getState());
        Assert.assertEquals((Object)VertexStatus.State.SUCCEEDED, (Object)dagClient.getVertexStatus("v2", null).getState());
        v3State = dagClient.getVertexStatus("v3", null).getState();
        if (v3State.equals((Object)VertexStatus.State.FAILED)) {
            LOG.info((Object)"v3 is failed");
            Assert.assertTrue((boolean)StringUtils.join((Collection)dagClient.getVertexStatus("v3", null).getDiagnostics(), (String)",").contains("fail output committer:v3Out"));
        } else {
            Assert.assertEquals((Object)VertexStatus.State.KILLED, (Object)v3State);
        }
        tezClient.stop();
    }

    @Test(timeout=5000L)
    public void testDAGFinishedRecoveryError() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        mockApp.recoveryFatalError = true;
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(true);
        DAG dag = DAG.create((String)"DAG-testDAGFinishedRecoveryError");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        dag.addVertex(vA);
        DAGClient dagClient = tezClient.submitDAG(dag);
        dagClient.waitForCompletion();
        while (!mockApp.getShutdownHandler().wasShutdownInvoked()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)mockApp.getContext().getCurrentDAG().getState());
        Assert.assertEquals((Object)DAGAppMasterState.FAILED, (Object)mockApp.getState());
        Assert.assertTrue((boolean)StringUtils.join((Collection)mockApp.getDiagnostics(), (String)",").contains("Recovery had a fatal error, shutting down session after DAG completion"));
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            String stagingDir = "target/" + TestMockDAGAppMaster.class.getName() + "-tmpDir";
            defaultConf.set("tez.staging-dir", stagingDir);
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }

    static class TestEventsDelegate
    implements MockDAGAppMaster.EventsDelegate {
        TestEventsDelegate() {
        }

        @Override
        public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) {
            for (OutputSpec output : taskSpec.getOutputs()) {
                if (output.getPhysicalEdgeCount() == 1) {
                    events.add(new TezEvent((Event)DataMovementEvent.create((int)0, (int)0, (int)0, null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output.getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
                    continue;
                }
                events.add(new TezEvent((Event)CompositeDataMovementEvent.create((int)0, (int)output.getPhysicalEdgeCount(), null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output.getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
            }
        }
    }

    public static class LegacyEdgeTestEdgeManager
    extends EdgeManagerPlugin {
        List<Integer> destinationInputIndices = Collections.singletonList(0);

        public LegacyEdgeTestEdgeManager(EdgeManagerPluginContext context) {
            super(context);
        }

        public void initialize() throws Exception {
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
            return 1;
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
            return 1;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
            destinationTaskAndInputIndices.put(sourceTaskIndex, this.destinationInputIndices);
        }

        public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
            destinationTaskAndInputIndices.put(sourceTaskIndex, this.destinationInputIndices);
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) {
            return destinationTaskIndex;
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
            return 1;
        }
    }

    public static class FailingOutputCommitter
    extends OutputCommitter {
        boolean failOnCommit = false;

        public FailingOutputCommitter(OutputCommitterContext committerContext) {
            super(committerContext);
        }

        public void initialize() throws Exception {
            FailingOutputCommitterConfig config = new FailingOutputCommitterConfig();
            config.fromUserPayload(this.getContext().getUserPayload().deepCopyAsArray());
            this.failOnCommit = config.failOnCommit;
        }

        public void setupOutput() throws Exception {
        }

        public void commitOutput() throws Exception {
            if (this.failOnCommit) {
                throw new Exception("fail output committer:" + this.getContext().getOutputName());
            }
        }

        public void abortOutput(VertexStatus.State finalState) throws Exception {
        }

        public static class FailingOutputCommitterConfig {
            boolean failOnCommit;

            public FailingOutputCommitterConfig() {
                this(false);
            }

            public FailingOutputCommitterConfig(boolean failOnCommit) {
                this.failOnCommit = failOnCommit;
            }

            public byte[] toUserPayload() {
                return Ints.toByteArray((int)(this.failOnCommit ? 1 : 0));
            }

            public void fromUserPayload(byte[] userPayload) {
                int failInt = Ints.fromByteArray((byte[])userPayload);
                this.failOnCommit = failInt != 0;
            }
        }
    }
}

