/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.CallableEventDispatcher;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.state.StateMachineTez;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDAGImpl {
    @Rule
    public TestName testName = new TestName();
    private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class);
    private DAGProtos.DAGPlan dagPlan;
    private TezDAGID dagId;
    private static Configuration conf;
    private DrainDispatcher dispatcher;
    private ListeningExecutorService execService;
    private Credentials fsTokens;
    private AppContext appContext;
    private ACLManager aclManager;
    private ApplicationAttemptId appAttemptId;
    private DAGImpl dag;
    private TaskSchedulerManager taskSchedulerManager;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private TaskHeartbeatHandler thh;
    private Clock clock = new SystemClock();
    private DAGFinishEventHandler dagFinishEventHandler;
    private AppContext mrrAppContext;
    private DAGProtos.DAGPlan mrrDagPlan;
    private DAGImpl mrrDag;
    private TezDAGID mrrDagId;
    private AppContext groupAppContext;
    private DAGProtos.DAGPlan groupDagPlan;
    private DAGImpl groupDag;
    private TezDAGID groupDagId;
    private DAGProtos.DAGPlan dagPlanWithCustomEdge;
    private DAGImpl dagWithCustomEdge;
    private TezDAGID dagWithCustomEdgeId;
    private AppContext dagWithCustomEdgeAppContext;
    private HistoryEventHandler historyEventHandler;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance((int)8192, (int)10));
    private HadoopShim defaultShim = new DefaultHadoopShim();

    private DAGImpl chooseDAG(TezDAGID curDAGId) {
        if (curDAGId.equals((Object)this.dagId)) {
            return this.dag;
        }
        if (curDAGId.equals((Object)this.mrrDagId)) {
            return this.mrrDag;
        }
        if (curDAGId.equals((Object)this.groupDagId)) {
            return this.groupDag;
        }
        if (curDAGId.equals((Object)this.dagWithCustomEdgeId)) {
            return this.dagWithCustomEdge;
        }
        throw new RuntimeException("Invalid event, unknown dag, dagId=" + curDAGId);
    }

    private DAGProtos.DAGPlan createTestMRRDAGPlan() {
        LOG.info("Setting up MRR dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output1").build()).setName("output1").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output2").build()).setName("output2").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e1").addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output3").build()).setName("output3").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    static DAGProtos.DAGPlan createGroupDAGPlan(String dagName) {
        LOG.info("Setting up group dag plan");
        int dummyTaskCount = 1;
        Resource dummyTaskResource = Resource.newInstance((int)1, (int)1);
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"vertex1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create((String)"vertex2", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create((String)"vertex3", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create((String)("DAG-" + dagName));
        String groupName1 = "uv12";
        OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create((String)TotalCountingOutputCommitter.class.getName());
        VertexGroup uv12 = dag.createVertexGroup(groupName1, new org.apache.tez.dag.api.Vertex[]{v1, v2});
        OutputDescriptor outDesc = OutputDescriptor.create((String)"output.class");
        uv12.addDataSink("uvOut", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd, null));
        v3.addDataSink("uvOut", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd, null));
        GroupInputEdge e1 = GroupInputEdge.create((VertexGroup)uv12, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"dummy output class"), (InputDescriptor)InputDescriptor.create((String)"dummy input class")), (InputDescriptor)InputDescriptor.create((String)"merge.class"));
        dag.addVertex(v1);
        dag.addVertex(v2);
        dag.addVertex(v3);
        dag.addEdge(e1);
        return dag.createDag(conf, null, null, null, true);
    }

    public static DAGProtos.DAGPlan createTestDAGPlan() {
        LOG.info("Setting up dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").setDagConf(DAGProtos.ConfigurationProto.newBuilder().addConfKeyValues(DAGProtos.PlanKeyValuePair.newBuilder().setKey("tez.am.task.max.failed.attempts").setValue("3"))).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).setVertexConf(DAGProtos.ConfigurationProto.newBuilder().addConfKeyValues(DAGProtos.PlanKeyValuePair.newBuilder().setKey("tez.am.task.max.failed.attempts").setValue("2"))).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutEdgeId("e3").addOutEdgeId("e4").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack4").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addOutEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host5").addRack("rack5").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x5.y5").build()).addInEdgeId("e4").addOutEdgeId("e6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host6").addRack("rack6").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x6.y6").build()).addInEdgeId("e5").addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i4_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v4")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i5_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v5")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o4")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")).setInputVertexName("vertex5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o5")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
        LOG.info("Setting up custome edge dag plan " + (Object)((Object)exLocation) + " " + useLegacy);
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(useLegacy ? CustomizedEdgeManagerLegacy.class.getName() : CustomizedEdgeManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFromUtf8((String)exLocation.name())))).setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGWithNonExistEdgeManager() {
        LOG.info("Setting up dag plan with non-exist edgemanager");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-edge-manager")).setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    @BeforeClass
    public static void beforeClass() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setup() {
        conf = new Configuration();
        conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        this.dagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)1);
        Assert.assertNotNull((Object)this.dagId);
        this.dagPlan = TestDAGImpl.createTestDAGPlan();
        this.dispatcher = new DrainDispatcher();
        this.fsTokens = new Credentials();
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        this.taskSchedulerManager = (TaskSchedulerManager)Mockito.mock(TaskSchedulerManager.class);
        this.execService = (ListeningExecutorService)Mockito.mock(ListeningExecutorService.class);
        final ListenableFuture mockFuture = (ListenableFuture)Mockito.mock(ListenableFuture.class);
        Mockito.when((Object)this.appContext.getHadoopShim()).thenReturn((Object)this.defaultShim);
        Mockito.when((Object)this.appContext.getApplicationID()).thenReturn((Object)this.appAttemptId.getApplicationId());
        ((ListeningExecutorService)Mockito.doAnswer((Answer)new Answer(){

            public ListenableFuture<Void> answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                CallableEvent e = (CallableEvent)args[0];
                TestDAGImpl.this.dispatcher.getEventHandler().handle((Event)e);
                return mockFuture;
            }
        }).when((Object)this.execService)).submit((Callable)Mockito.any());
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.appContext)).getExecService();
        this.historyEventHandler = (HistoryEventHandler)Mockito.mock(HistoryEventHandler.class);
        this.aclManager = new ACLManager("amUser");
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.appContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.appContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.appContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.dagId).when((Object)this.appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.appContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.appContext)).getAMACLManager();
        ((AppContext)Mockito.doReturn((Object)this.defaultShim).when((Object)this.appContext)).getHadoopShim();
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.dag);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.clusterInfo).when((Object)this.appContext)).getClusterInfo();
        this.mrrAppContext = (AppContext)Mockito.mock(AppContext.class);
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.mrrAppContext)).getAMACLManager();
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.mrrAppContext)).getExecService();
        ((AppContext)Mockito.doReturn((Object)this.defaultShim).when((Object)this.mrrAppContext)).getHadoopShim();
        this.mrrDagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)2);
        this.mrrDagPlan = this.createTestMRRDAGPlan();
        this.mrrDag = new DAGImpl(this.mrrDagId, conf, this.mrrDagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.mrrAppContext);
        this.mrrDag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.mrrDag);
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.mrrAppContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.mrrDag).when((Object)this.mrrAppContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.mrrAppContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.mrrAppContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.mrrAppContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.clusterInfo).when((Object)this.mrrAppContext)).getClusterInfo();
        this.groupAppContext = (AppContext)Mockito.mock(AppContext.class);
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.groupAppContext)).getAMACLManager();
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.groupAppContext)).getExecService();
        ((AppContext)Mockito.doReturn((Object)this.defaultShim).when((Object)this.groupAppContext)).getHadoopShim();
        this.groupDagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)3);
        this.groupDagPlan = TestDAGImpl.createGroupDAGPlan(this.testName.getMethodName());
        this.groupDag = new DAGImpl(this.groupDagId, conf, this.groupDagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.groupAppContext);
        this.groupDag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.groupDag);
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.groupAppContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.groupDag).when((Object)this.groupAppContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.groupAppContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.groupAppContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.groupAppContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.clusterInfo).when((Object)this.groupAppContext)).getClusterInfo();
        TotalCountingOutputCommitter.totalCommitCounter = 0;
        this.dispatcher.register(CallableEventType.class, (EventHandler)new CallableEventDispatcher());
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, (EventHandler)this.taskEventDispatcher);
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)this.taskAttemptEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, (EventHandler)this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler)this.dagEventDispatcher);
        this.dagFinishEventHandler = new DAGFinishEventHandler();
        this.dispatcher.register(DAGAppMasterEventType.class, (EventHandler)this.dagFinishEventHandler);
        this.dispatcher.init(conf);
        this.dispatcher.start();
    }

    @After
    public void teardown() {
        this.dispatcher.await();
        this.dispatcher.stop();
        this.execService.shutdownNow();
        this.dagPlan = null;
        if (this.dag != null) {
            this.dag.entityUpdateTracker.stop();
        }
        if (this.mrrDag != null) {
            this.mrrDag.entityUpdateTracker.stop();
        }
        if (this.groupDag != null) {
            this.groupDag.entityUpdateTracker.stop();
        }
        if (this.dagWithCustomEdge != null) {
            this.dagWithCustomEdge.entityUpdateTracker.stop();
        }
        this.dag = null;
        this.mrrDag = null;
        this.groupDag = null;
        this.dagWithCustomEdge = null;
    }

    private void setupDAGWithCustomEdge(ExceptionLocation exLocation) {
        this.setupDAGWithCustomEdge(exLocation, false);
    }

    private void setupDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
        this.dagWithCustomEdgeId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)4);
        this.dagPlanWithCustomEdge = this.createDAGWithCustomEdge(exLocation, useLegacy);
        this.dagWithCustomEdgeAppContext = (AppContext)Mockito.mock(AppContext.class);
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.dagWithCustomEdgeAppContext)).getAMACLManager();
        Mockito.when((Object)this.dagWithCustomEdgeAppContext.getHadoopShim()).thenReturn((Object)this.defaultShim);
        this.dagWithCustomEdge = new DAGImpl(this.dagWithCustomEdgeId, conf, this.dagPlanWithCustomEdge, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.dagWithCustomEdgeAppContext);
        this.dagWithCustomEdge.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.dagWithCustomEdge);
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.dagWithCustomEdgeAppContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.dagWithCustomEdgeAppContext)).getExecService();
        ((AppContext)Mockito.doReturn((Object)this.dagWithCustomEdge).when((Object)this.dagWithCustomEdgeAppContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.dagWithCustomEdgeAppContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.dagWithCustomEdgeAppContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.dagWithCustomEdgeAppContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.clusterInfo).when((Object)this.dagWithCustomEdgeAppContext)).getClusterInfo();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)new TaskAttemptEventDisptacher2());
        this.dispatcher.register(AMSchedulerEventType.class, (EventHandler)new AMSchedulerEventHandler());
        Mockito.when((Object)this.dagWithCustomEdgeAppContext.getContainerLauncherName(Mockito.anyInt())).thenReturn((Object)TezConstants.getTezYarnServicePluginName());
    }

    private void initDAG(DAGImpl impl) {
        impl.handle(new DAGEvent(impl.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.INITED, (Object)impl.getState());
    }

    private void startDAG(DAGImpl impl) {
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(impl.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)impl.getState());
    }

    @Test(timeout=5000L)
    public void testDAGInit() {
        this.initDAG(this.dag);
        Assert.assertEquals((long)6L, (long)this.dag.getTotalVertices());
    }

    @Test(timeout=5000L)
    public void testDAGInitFailed() {
        this.setupDAGWithCustomEdge(ExceptionLocation.Initialize);
        this.dagWithCustomEdge.handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dagWithCustomEdge.getState());
        this.dagWithCustomEdge.handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dagWithCustomEdge.getState());
    }

    @Test(timeout=5000L)
    public void testDAGInitFailedDuetoInvalidResource() {
        ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance((int)512, (int)10));
        ((AppContext)Mockito.doReturn((Object)clusterInfo).when((Object)this.appContext)).getClusterInfo();
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.INIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)StringUtils.join((Collection)this.dag.getDiagnostics(), (String)",").contains("Vertex's TaskResource is beyond the cluster container capability"));
    }

    @Test(timeout=5000L)
    public void testDAGStart() {
        TezVertexID vId;
        int i;
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        for (i = 0; i < 6; ++i) {
            vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
            if (i < 2) {
                Assert.assertEquals((long)0L, (long)v.getDistanceFromRoot());
                continue;
            }
            if (i == 2) {
                Assert.assertEquals((long)1L, (long)v.getDistanceFromRoot());
                continue;
            }
            if (i > 2 && i < 5) {
                Assert.assertEquals((long)2L, (long)v.getDistanceFromRoot());
                continue;
            }
            if (i != 5) continue;
            Assert.assertEquals((long)3L, (long)v.getDistanceFromRoot());
        }
        for (i = 0; i < 6; ++i) {
            vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            LOG.info("Distance from root: v" + i + ":" + this.dag.getVertex(vId).getDistanceFromRoot());
        }
    }

    @Test(timeout=5000L)
    public void testNonExistEdgeManagerPlugin() {
        this.dagPlan = this.createDAGWithNonExistEdgeManager();
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.dag);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        this.dag.handle(new DAGEvent(this.dagId, DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.INIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)StringUtils.join((Collection)this.dag.getDiagnostics(), (String)"").contains("java.lang.ClassNotFoundException: non-exist-edge-manager"));
    }

    @Test(timeout=5000L)
    public void testNonExistDAGScheduler() {
        conf.set("tez.am.dag.scheduler.class", "non-exist-dag-scheduler");
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.dag);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.INIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)StringUtils.join((Collection)this.dag.getDiagnostics(), (String)"").contains("java.lang.ClassNotFoundException: non-exist-dag-scheduler"));
    }

    @Test(timeout=5000L)
    public void testVertexCompletion() {
        this.initDAG(this.dag);
        Assert.assertTrue((0.0f == this.dag.getCompletedTaskProgress() ? 1 : 0) != 0);
        this.startDAG(this.dag);
        Assert.assertTrue((0.0f == this.dag.getCompletedTaskProgress() ? 1 : 0) != 0);
        this.dispatcher.await();
        TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        Vertex v = this.dag.getVertex(vId);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((double)0.1818181872367859, (double)this.dag.getCompletedTaskProgress(), (double)0.05);
    }

    @Test(timeout=5000L)
    public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
        this.setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
        this.setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        String diag = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteDataMovementEventToDestination() {
        this.setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskID(), (int)0));
        DataMovementEvent daEvent = DataMovementEvent.create((ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)daEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getTaskAttemptID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        v2.getTaskAttemptTezEvents(ta1.getTaskAttemptID(), 0, 0, 1000);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
        this.setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskID(), (int)0));
        DataMovementEvent daEvent = DataMovementEvent.create((ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)daEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getTaskAttemptID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
        this.setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination, true);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskID(), (int)0));
        InputFailedEvent ifEvent = InputFailedEvent.create((int)0, (int)1);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)ifEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getTaskAttemptID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        v2.getTaskAttemptTezEvents(ta1.getTaskAttemptID(), 0, 0, 1000);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_GetNumDestinationConsumerTasks() {
        this.setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskID(), (int)0));
        InputReadErrorEvent ireEvent = InputReadErrorEvent.create((String)"", (int)0, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)ireEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "vertex1", ta1.getTaskAttemptID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.GetNumDestinationConsumerTasks.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteInputErrorEventToSource() {
        this.setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskID(), (int)0));
        InputReadErrorEvent ireEvent = InputReadErrorEvent.create((String)"", (int)0, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)ireEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "vertex1", ta1.getTaskAttemptID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(ExceptionLocation.RouteInputErrorEventToSource.name()));
    }

    @Test(timeout=5000L)
    public void testGroupDAGCompletionWithCommitSuccess() {
        this.initDAG(this.groupDag);
        this.startDAG(this.groupDag);
        this.dispatcher.await();
        for (int i = 0; i < 3; ++i) {
            Vertex v = this.groupDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.groupDag.getSuccessfulVertices());
        }
        Assert.assertEquals((long)3L, (long)this.groupDag.getSuccessfulVertices());
        Assert.assertTrue((1.0f == this.groupDag.getCompletedTaskProgress() ? 1 : 0) != 0);
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.groupDag.getState());
        Assert.assertEquals((long)2L, (long)TotalCountingOutputCommitter.totalCommitCounter);
    }

    @Test(timeout=5000L)
    public void testGroupDAGWithVertexReRunning() {
        this.groupDag.getConf().setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.initDAG(this.groupDag);
        this.startDAG(this.groupDag);
        this.dispatcher.await();
        Vertex v1 = this.groupDag.getVertex("vertex1");
        Vertex v2 = this.groupDag.getVertex("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexReRunning(v1.getVertexId()));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)TotalCountingOutputCommitter.totalCommitCounter);
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)TotalCountingOutputCommitter.totalCommitCounter);
        Assert.assertEquals((long)2L, (long)this.groupDag.getSuccessfulVertices());
    }

    @Test(timeout=5000L)
    public void testGroupDAGWithVertexReRunningAfterCommit() {
        this.groupDag.getConf().setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.initDAG(this.groupDag);
        this.startDAG(this.groupDag);
        this.dispatcher.await();
        Vertex v1 = this.groupDag.getVertex("vertex1");
        Vertex v2 = this.groupDag.getVertex("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)TotalCountingOutputCommitter.totalCommitCounter);
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexReRunning(v1.getVertexId()));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.groupDag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, (Object)this.groupDag.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testDAGCompletionWithCommitSuccess() {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        for (Vertex v : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex v = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)3L, (long)this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.mrrDag.getState());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGCompletionWithCommitFailure() throws IOException {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        Vertex badVertex = this.mrrDag.getVertex("vertex3");
        ArrayList<DAGProtos.RootInputLeafOutputProto> outputs = new ArrayList<DAGProtos.RootInputLeafOutputProto>();
        outputs.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new TestVertexImpl.CountingOutputCommitter.CountingOutputCommitterConfig(true, false, false).toUserPayload())).build())).setName("output3").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        badVertex.setAdditionalOutputs(outputs);
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        for (Vertex v : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex v = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)3L, (long)this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.mrrDag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.COMMIT_FAILURE, (Object)this.mrrDag.getTerminationCause());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)1L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGErrorAbortAllOutputs() {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        for (Vertex v : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex v = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)v.getState());
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.mrrDag.getState());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)1L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGErrorAbortNonSuccessfulOutputs() {
        this.mrrDag.getConf().setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex errorVertex = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(errorVertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)errorVertex.getState());
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.mrrDag.getState());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                if (vertex == errorVertex) {
                    Assert.assertEquals((long)1L, (long)committer.abortCounter);
                    Assert.assertEquals((long)0L, (long)committer.commitCounter);
                    Assert.assertEquals((long)1L, (long)committer.initCounter);
                    Assert.assertEquals((long)1L, (long)committer.setupCounter);
                    continue;
                }
                Assert.assertEquals((long)1L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testVertexReRunning() {
        this.initDAG(this.dag);
        this.dag.dagScheduler = (DAGScheduler)Mockito.mock(DAGScheduler.class);
        this.startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        Vertex v = this.dag.getVertex(vId);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((long)1L, (long)this.dag.numCompletedVertices);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskReschedule(TezTaskID.getInstance((TezVertexID)vId, (int)0)));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((long)0L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((long)0L, (long)this.dag.numCompletedVertices);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((long)1L, (long)this.dag.numCompletedVertices);
    }

    public void testKillStartedDAG() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventTerminateDag(this.dagId, DAGTerminationCause.DAG_KILL, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        for (int i = 0; i < 6; ++i) {
            TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        }
    }

    @Test(timeout=5000L)
    public void testKillRunningDAG() {
        this._testTerminateRunningDAG(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testServiceErrorRunningDAG() {
        this._testTerminateRunningDAG(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testTerminateRunningDAG(DAGTerminationCause terminationCause) {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID vId1 = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        Vertex v1 = this.dag.getVertex(vId1);
        ((EventHandler)v1).handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId1, (int)0), TaskState.SUCCEEDED));
        TezVertexID vId0 = TezVertexID.getInstance((TezDAGID)this.dagId, (int)0);
        Vertex v0 = this.dag.getVertex(vId0);
        ((EventHandler)v0).handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId0, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v0.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventTerminateDag(this.dagId, terminationCause, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.TERMINATING, (Object)this.dag.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v0.getState());
        Assert.assertEquals((Object)VertexState.TERMINATING, (Object)v1.getState());
        for (int i = 2; i < 6; ++i) {
            TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        }
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
    }

    @Test(timeout=5000L)
    public void testInvalidEvent() {
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagId, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.dag.getState());
    }

    @Test(timeout=5000L)
    @Ignore
    public void testVertexSuccessfulCompletionUpdates() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < 6; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)2), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)3), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)4), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.dag.getState());
        Assert.assertEquals((long)6L, (long)this.dag.getSuccessfulVertices());
    }

    @Test(timeout=10000L)
    public void testGetDAGStatusWithWait() throws TezException {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < this.dag.getVertices().size() - 1; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((long)5L, (long)this.dag.getSuccessfulVertices());
        long dagStatusStartTime = System.currentTimeMillis();
        DAGStatusBuilder dagStatus = this.dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        long dagStatusEndTime = System.currentTimeMillis();
        long diff = dagStatusEndTime - dagStatusStartTime;
        Assert.assertTrue((diff >= 0L && diff < 2500L ? 1 : 0) != 0);
        Assert.assertEquals((Object)DAGStatus.State.RUNNING, (Object)dagStatus.getState());
    }

    @Test(timeout=20000L)
    public void testGetDAGStatusReturnOnDagSucceeded() throws InterruptedException, TezException {
        this.runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.SUCCEEDED);
    }

    @Test(timeout=20000L)
    public void testGetDAGStatusReturnOnDagFailed() throws InterruptedException, TezException {
        this.runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.FAILED);
    }

    @Test(timeout=20000L)
    public void testGetDAGStatusReturnOnDagKilled() throws InterruptedException, TezException {
        this.runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.KILLED);
    }

    @Test(timeout=20000L)
    public void testGetDAGStatusReturnOnDagError() throws InterruptedException, TezException {
        this.runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.ERROR);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State testState) throws TezException, InterruptedException {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < this.dag.getVertices().size() - 1; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((long)5L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGStatus.State.RUNNING, (Object)this.dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 10000L).getState());
        ReentrantLock lock = new ReentrantLock();
        Condition startCondition = lock.newCondition();
        Condition endCondition = lock.newCondition();
        DagStatusCheckRunnable statusCheckRunnable = new DagStatusCheckRunnable(lock, startCondition, endCondition);
        Thread t1 = new Thread(statusCheckRunnable);
        t1.start();
        lock.lock();
        try {
            while (!statusCheckRunnable.started.get()) {
                startCondition.await();
            }
        }
        finally {
            lock.unlock();
        }
        Thread.sleep(2000L);
        if (testState == DAGStatus.State.SUCCEEDED) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.SUCCEEDED));
        } else if (testState == DAGStatus.State.FAILED) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.FAILED));
        } else if (testState == DAGStatus.State.KILLED) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventTerminateDag(this.dagId, DAGTerminationCause.DAG_KILL, null));
        } else if (testState == DAGStatus.State.ERROR) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagId, new LinkedList()));
        } else {
            throw new UnsupportedOperationException("Unsupported state for test: " + testState);
        }
        this.dispatcher.await();
        lock.lock();
        try {
            while (!statusCheckRunnable.ended.get()) {
                endCondition.await();
            }
        }
        finally {
            lock.unlock();
        }
        long diff = statusCheckRunnable.dagStatusEndTime - statusCheckRunnable.dagStatusStartTime;
        Assert.assertNotNull((Object)statusCheckRunnable.dagStatus);
        Assert.assertTrue((String)("Status: " + statusCheckRunnable.dagStatus.getState() + ", Diff:" + diff), (diff >= 0L && diff < 3500L ? 1 : 0) != 0);
        Assert.assertEquals((Object)testState, (Object)statusCheckRunnable.dagStatus.getState());
        t1.join();
    }

    @Test(timeout=5000L)
    public void testVertexFailureHandling() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)2), VertexState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((long)2L, (long)this.dag.getSuccessfulVertices());
        for (int i = 3; i < 6; ++i) {
            TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        }
    }

    @Test(timeout=5000L)
    public void testDAGKill() {
        this._testDAGTerminate(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testDAGServiceError() {
        this._testDAGTerminate(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGTerminate(DAGTerminationCause terminationCause) {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventTerminateDag(this.dagId, terminationCause, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)terminationCause.getFinishedState(), (Object)this.dag.getState());
        Assert.assertEquals((Object)terminationCause, (Object)this.dag.getTerminationCause());
        Assert.assertEquals((long)2L, (long)this.dag.getSuccessfulVertices());
        int killedCount = 0;
        for (Map.Entry vEntry : this.dag.getVertices().entrySet()) {
            if (((Vertex)vEntry.getValue()).getState() != VertexState.KILLED) continue;
            ++killedCount;
        }
        Assert.assertEquals((long)4L, (long)killedCount);
        for (Vertex v : this.dag.getVertices().values()) {
            Assert.assertEquals((Object)VertexTerminationCause.DAG_TERMINATED, (Object)v.getTerminationCause());
        }
        Assert.assertEquals((long)1L, (long)this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout=5000L)
    public void testDAGHang() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.dag = (DAGImpl)Mockito.spy((Object)new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext));
        StateMachineTez spyStateMachine = (StateMachineTez)Mockito.spy((Object)new StateMachineTez(DAGImpl.stateMachineFactory.make((Object)this.dag), (Object)this.dag));
        Mockito.when((Object)this.dag.getStateMachine()).thenReturn((Object)spyStateMachine);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.dag);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        DAGImpl.OutputKey outputKey = (DAGImpl.OutputKey)Mockito.mock(DAGImpl.OutputKey.class);
        ListenableFuture future = (ListenableFuture)Mockito.mock(ListenableFuture.class);
        this.dag.commitFutures.put(outputKey, future);
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)2), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)3), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)4), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.COMMITTING, (Object)this.dag.getState());
        DAGEventCommitCompleted dagEvent = new DAGEventCommitCompleted(this.dagId, outputKey, false, (Throwable)new RuntimeException("test"));
        ((DAGImpl)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("test")}).when((Object)this.dag)).logJobHistoryUnsuccesfulEvent((DAGState)Mockito.any(), (TezCounters)Mockito.any());
        this.dag.handle((DAGEvent)dagEvent);
        this.dispatcher.await();
        Assert.assertTrue((String)"DAG did not terminate!", (this.dag.getInternalState() == DAGState.FAILED ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testDAGKillVertexSuccessAfterTerminated() {
        this._testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testDAGServiceErrorVertexSuccessAfterTerminated() {
        this._testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause terminationCause) {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventTerminateDag(this.dagId, terminationCause, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)terminationCause.getFinishedState(), (Object)this.dag.getState());
        for (int i = 2; i < 6; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        int killedCount = 0;
        for (Map.Entry vEntry : this.dag.getVertices().entrySet()) {
            if (((Vertex)vEntry.getValue()).getState() != VertexState.KILLED) continue;
            ++killedCount;
        }
        Assert.assertEquals((long)4L, (long)killedCount);
        Assert.assertEquals((Object)terminationCause, (Object)this.dag.getTerminationCause());
        Assert.assertEquals((long)2L, (long)this.dag.getSuccessfulVertices());
        for (Vertex v : this.dag.getVertices().values()) {
            Assert.assertEquals((Object)VertexTerminationCause.DAG_TERMINATED, (Object)v.getTerminationCause());
        }
        Assert.assertEquals((long)1L, (long)this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout=5000L)
    public void testDAGKillPending() {
        this._testDAGKillPending(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testDAGServiceErrorPending() {
        this._testDAGKillPending(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGKillPending(DAGTerminationCause terminationCause) {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        for (int i = 2; i < 5; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventTerminateDag(this.dagId, terminationCause, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)terminationCause.getFinishedState(), (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals((Object)terminationCause.getFinishedState(), (Object)this.dag.getState());
        Assert.assertEquals((long)5L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((Object)this.dag.getVertex(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5)).getTerminationCause(), (Object)VertexTerminationCause.DAG_TERMINATED);
        Assert.assertEquals((long)1L, (long)this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout=5000L)
    public void testConfiguration() throws AMUserCodeException {
        this.initDAG(this.dag);
        Assert.assertEquals((long)3L, (long)this.dag.getConf().getInt("tez.am.task.max.failed.attempts", 4));
        Vertex v1 = this.dag.getVertex("vertex1");
        Vertex v2 = this.dag.getVertex("vertex2");
        Assert.assertEquals((long)2L, (long)v1.getConf().getInt("tez.am.task.max.failed.attempts", 4));
        Assert.assertEquals((long)3L, (long)v2.getConf().getInt("tez.am.task.max.failed.attempts", 4));
    }

    @Test(timeout=5000L)
    public void testCounterLimits() {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 3; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            TezCounters ctrs = new TezCounters();
            for (int j = 0; j < 50; ++j) {
                ctrs.findCounter("g", "c" + i + "_" + j).increment(1L);
            }
            ((VertexImpl)v).setCounters(ctrs);
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        Assert.assertEquals((long)3L, (long)this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.mrrDag.getState());
        Assert.assertTrue((String)"Diagnostics should contain counter limits error message", (boolean)StringUtils.join((Collection)this.mrrDag.getDiagnostics(), (String)",").contains("Counters limit exceeded"));
    }

    @Test(timeout=5000L)
    public void testTotalContainersUsedCounter() {
        DAGImpl spy = this.getDagSpy();
        spy.addUsedContainer(Container.newInstance((ContainerId)ContainerId.fromString((String)"container_e16_1504924099862_7571_01_000005"), (NodeId)((NodeId)Mockito.mock(NodeId.class)), null, null, null, null));
        spy.addUsedContainer(Container.newInstance((ContainerId)ContainerId.fromString((String)"container_e16_1504924099862_7571_01_000006"), (NodeId)((NodeId)Mockito.mock(NodeId.class)), null, null, null, null));
        spy.onFinish();
        ((DAGImpl)Mockito.verify((Object)spy, (VerificationMode)Mockito.times((int)2))).addUsedContainer((Container)Mockito.any(Container.class));
        Assert.assertEquals((long)2L, (long)((CounterGroup)spy.getAllCounters().getGroup(DAGCounter.class.getName())).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name()).getValue());
    }

    @Test(timeout=5000L)
    public void testNodesUsedCounter() {
        DAGImpl spy = this.getDagSpy();
        Container containerOnHost = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)containerOnHost.getNodeId()).thenReturn((Object)NodeId.fromString((String)"localhost:0"));
        Container containerOnSameHost = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)containerOnSameHost.getNodeId()).thenReturn((Object)NodeId.fromString((String)"localhost:0"));
        Container containerOnDifferentHost = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)containerOnDifferentHost.getNodeId()).thenReturn((Object)NodeId.fromString((String)"otherhost:0"));
        Container containerOnSameHostWithDifferentPort = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)containerOnSameHostWithDifferentPort.getNodeId()).thenReturn((Object)NodeId.fromString((String)"localhost:1"));
        spy.addUsedContainer(containerOnHost);
        spy.addUsedContainer(containerOnSameHost);
        spy.addUsedContainer(containerOnDifferentHost);
        spy.addUsedContainer(containerOnSameHostWithDifferentPort);
        Mockito.when((Object)this.taskSchedulerManager.getNumClusterNodes(Mockito.anyBoolean())).thenReturn((Object)10);
        spy.onFinish();
        ((DAGImpl)Mockito.verify((Object)spy, (VerificationMode)Mockito.times((int)4))).addUsedContainer((Container)Mockito.any(Container.class));
        Assert.assertEquals((long)2L, (long)((CounterGroup)spy.getAllCounters().getGroup(DAGCounter.class.getName())).findCounter(DAGCounter.NODE_USED_COUNT.name()).getValue());
        Assert.assertTrue((boolean)spy.nodesUsedByCurrentDAG.contains("localhost"));
        Assert.assertTrue((boolean)spy.nodesUsedByCurrentDAG.contains("otherhost"));
        Assert.assertEquals((long)10L, (long)((CounterGroup)spy.getAllCounters().getGroup(DAGCounter.class.getName())).findCounter(DAGCounter.NODE_TOTAL_COUNT.name()).getValue());
    }

    private DAGImpl getDagSpy() {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        Mockito.when((Object)this.mrrAppContext.getTaskScheduler()).thenReturn((Object)this.taskSchedulerManager);
        return (DAGImpl)Mockito.spy((Object)this.mrrDag);
    }

    static {
        Limits.reset();
        Configuration conf = new Configuration(false);
        conf.setInt("tez.counters.max", 100);
        conf.setInt("tez.counters.max.groups", 100);
        Limits.setConfiguration((Configuration)conf);
    }

    public static class TotalCountingOutputCommitter
    extends TestVertexImpl.CountingOutputCommitter {
        static int totalCommitCounter = 0;

        public TotalCountingOutputCommitter(OutputCommitterContext context) {
            super(context);
        }

        @Override
        public void commitOutput() throws IOException {
            ++totalCommitCounter;
            super.commitOutput();
        }
    }

    public static class CustomizedEdgeManagerLegacy
    extends EdgeManagerPlugin {
        private ExceptionLocation exLocation;

        public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
            return (EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomizedEdgeManager.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(exLocation.name().getBytes())));
        }

        public CustomizedEdgeManagerLegacy(EdgeManagerPluginContext context) {
            super(context);
            this.exLocation = ExceptionLocation.valueOf(new String(context.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
        }

        public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }
    }

    public static class CustomizedEdgeManager
    extends EdgeManagerPluginOnDemand {
        private ExceptionLocation exLocation;

        public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
            return (EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomizedEdgeManager.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(exLocation.name().getBytes())));
        }

        public CustomizedEdgeManager(EdgeManagerPluginContext context) {
            super(context);
            this.exLocation = ExceptionLocation.valueOf(new String(context.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int sourceTaskIndex, int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public void prepareForRouting() throws Exception {
        }
    }

    public static enum ExceptionLocation {
        Initialize,
        GetNumDestinationTaskPhysicalInputs,
        GetNumSourceTaskPhysicalOutputs,
        RouteDataMovementEventToDestination,
        RouteInputSourceTaskFailedEventToDestination,
        GetNumDestinationConsumerTasks,
        RouteInputErrorEventToSource;

    }

    private class TaskEventDispatcher
    implements EventHandler<TaskEvent> {
        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent event) {
            TezDAGID id = event.getDAGID();
            DAGImpl handler = TestDAGImpl.this.chooseDAG(id);
            Vertex vertex = handler.getVertex(event.getVertexID());
            Task task = vertex.getTask(event.getTaskID());
            ((EventHandler)task).handle((Event)event);
        }
    }

    private class TaskAttemptEventDispatcher
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent event) {
        }
    }

    private class VertexEventDispatcher
    implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent event) {
            TezDAGID id = event.getDAGID();
            DAGImpl handler = TestDAGImpl.this.chooseDAG(id);
            Vertex vertex = handler.getVertex(event.getVertexID());
            ((EventHandler)vertex).handle((Event)event);
        }
    }

    private class DagEventDispatcher
    implements EventHandler<DAGEvent> {
        private DagEventDispatcher() {
        }

        public void handle(DAGEvent event) {
            DAGImpl dag = TestDAGImpl.this.chooseDAG(event.getDAGID());
            dag.handle(event);
        }
    }

    private class DAGFinishEventHandler
    implements EventHandler<DAGAppMasterEventDAGFinished> {
        public int dagFinishEvents = 0;

        private DAGFinishEventHandler() {
        }

        public void handle(DAGAppMasterEventDAGFinished event) {
            ++this.dagFinishEvents;
        }
    }

    private class TaskAttemptEventDisptacher2
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDisptacher2() {
        }

        public void handle(TaskAttemptEvent event) {
            TezDAGID id = event.getDAGID();
            DAGImpl handler = TestDAGImpl.this.chooseDAG(id);
            Vertex vertex = handler.getVertex(event.getVertexID());
            Task task = vertex.getTask(event.getTaskID());
            TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
            ((EventHandler)ta).handle((Event)event);
        }
    }

    private class AMSchedulerEventHandler
    implements EventHandler<AMSchedulerEvent> {
        private AMSchedulerEventHandler() {
        }

        public void handle(AMSchedulerEvent event) {
        }
    }

    private class DagStatusCheckRunnable
    implements Runnable {
        private volatile DAGStatusBuilder dagStatus;
        private volatile long dagStatusStartTime = -1L;
        private volatile long dagStatusEndTime = -1L;
        private final AtomicBoolean started = new AtomicBoolean(false);
        private final AtomicBoolean ended = new AtomicBoolean(false);
        private final ReentrantLock lock;
        private final Condition startCondition;
        private final Condition endCondition;

        public DagStatusCheckRunnable(ReentrantLock lock, Condition startCondition, Condition endCondition) {
            this.lock = lock;
            this.startCondition = startCondition;
            this.endCondition = endCondition;
        }

        @Override
        public void run() {
            this.started.set(true);
            this.lock.lock();
            try {
                this.startCondition.signal();
            }
            finally {
                this.lock.unlock();
            }
            try {
                this.dagStatusStartTime = System.currentTimeMillis();
                this.dagStatus = TestDAGImpl.this.dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 10000L);
                this.dagStatusEndTime = System.currentTimeMillis();
            }
            catch (TezException tezException) {
                // empty catch block
            }
            this.lock.lock();
            this.ended.set(true);
            try {
                this.endCondition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

