package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.state.StateMachineTez;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl.class */
public class TestDAGImpl {
    private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class);
    private DAGProtos.DAGPlan dagPlan;
    private TezDAGID dagId;
    private static Configuration conf;
    private DrainDispatcher dispatcher;
    private ListeningExecutorService execService;
    private Credentials fsTokens;
    private AppContext appContext;
    private ACLManager aclManager;
    private ApplicationAttemptId appAttemptId;
    private DAGImpl dag;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private TaskHeartbeatHandler thh;
    private DAGFinishEventHandler dagFinishEventHandler;
    private AppContext mrrAppContext;
    private DAGProtos.DAGPlan mrrDagPlan;
    private DAGImpl mrrDag;
    private TezDAGID mrrDagId;
    private AppContext groupAppContext;
    private DAGProtos.DAGPlan groupDagPlan;
    private DAGImpl groupDag;
    private TezDAGID groupDagId;
    private DAGProtos.DAGPlan dagPlanWithCustomEdge;
    private DAGImpl dagWithCustomEdge;
    private TezDAGID dagWithCustomEdgeId;
    private AppContext dagWithCustomEdgeAppContext;
    private HistoryEventHandler historyEventHandler;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private Clock clock = new SystemClock();
    private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192, 10));
    private HadoopShim defaultShim = new DefaultHadoopShim();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$AMSchedulerEventHandler.class */
    public class AMSchedulerEventHandler implements EventHandler<AMSchedulerEvent> {
        private AMSchedulerEventHandler() {
        }

        public void handle(AMSchedulerEvent aMSchedulerEvent) {
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$CustomizedEdgeManager.class */
    public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
        private ExceptionLocation exLocation;

        public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exceptionLocation) {
            return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(exceptionLocation.name().getBytes())));
        }

        public CustomizedEdgeManager(EdgeManagerPluginContext edgeManagerPluginContext) {
            super(edgeManagerPluginContext);
            this.exLocation = ExceptionLocation.valueOf(new String(edgeManagerPluginContext.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int i) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumSourceTaskPhysicalOutputs(int i) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumDestinationConsumerTasks(int i) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int routeInputErrorEventToSource(int i, int i2) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeDataMovementEventToDestination(int i, int i2, int i3) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int i, int i2) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int i, int i2) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public void prepareForRouting() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$CustomizedEdgeManagerLegacy.class */
    public static class CustomizedEdgeManagerLegacy extends EdgeManagerPlugin {
        private ExceptionLocation exLocation;

        public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exceptionLocation) {
            return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName()).setUserPayload(UserPayload.create(ByteBuffer.wrap(exceptionLocation.name().getBytes())));
        }

        public CustomizedEdgeManagerLegacy(EdgeManagerPluginContext edgeManagerPluginContext) {
            super(edgeManagerPluginContext);
            this.exLocation = ExceptionLocation.valueOf(new String(edgeManagerPluginContext.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int i) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumSourceTaskPhysicalOutputs(int i) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent dataMovementEvent, int i, int i2, Map<Integer, List<Integer>> map) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
        }

        public void routeInputSourceTaskFailedEventToDestination(int i, Map<Integer, List<Integer>> map) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationConsumerTasks(int i) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent inputReadErrorEvent, int i, int i2) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$DAGFinishEventHandler.class */
    public class DAGFinishEventHandler implements EventHandler<DAGAppMasterEventDAGFinished> {
        public int dagFinishEvents;

        private DAGFinishEventHandler() {
            this.dagFinishEvents = 0;
        }

        public void handle(DAGAppMasterEventDAGFinished dAGAppMasterEventDAGFinished) {
            this.dagFinishEvents++;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$DagEventDispatcher.class */
    private class DagEventDispatcher implements EventHandler<DAGEvent> {
        private DagEventDispatcher() {
        }

        public void handle(DAGEvent dAGEvent) {
            TestDAGImpl.this.chooseDAG(dAGEvent.getDAGId()).handle(dAGEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$DagStatusCheckRunnable.class */
    public class DagStatusCheckRunnable implements Runnable {
        private volatile DAGStatusBuilder dagStatus;
        private volatile long dagStatusStartTime = -1;
        private volatile long dagStatusEndTime = -1;
        private final AtomicBoolean started = new AtomicBoolean(false);
        private final AtomicBoolean ended = new AtomicBoolean(false);
        private final ReentrantLock lock;
        private final Condition startCondition;
        private final Condition endCondition;

        public DagStatusCheckRunnable(ReentrantLock reentrantLock, Condition condition, Condition condition2) {
            this.lock = reentrantLock;
            this.startCondition = condition;
            this.endCondition = condition2;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.started.set(true);
            this.lock.lock();
            try {
                this.startCondition.signal();
                this.lock.unlock();
                try {
                    this.dagStatusStartTime = System.currentTimeMillis();
                    this.dagStatus = TestDAGImpl.this.dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 10000L);
                    this.dagStatusEndTime = System.currentTimeMillis();
                } catch (TezException e) {
                }
                this.lock.lock();
                this.ended.set(true);
                try {
                    this.endCondition.signal();
                } finally {
                }
            } finally {
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$ExceptionLocation.class */
    public enum ExceptionLocation {
        Initialize,
        GetNumDestinationTaskPhysicalInputs,
        GetNumSourceTaskPhysicalOutputs,
        RouteDataMovementEventToDestination,
        RouteInputSourceTaskFailedEventToDestination,
        GetNumDestinationConsumerTasks,
        RouteInputErrorEventToSource
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$TaskAttemptEventDispatcher.class */
    private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent taskAttemptEvent) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$TaskAttemptEventDisptacher2.class */
    public class TaskAttemptEventDisptacher2 implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDisptacher2() {
        }

        public void handle(TaskAttemptEvent taskAttemptEvent) {
            TestDAGImpl.this.chooseDAG(taskAttemptEvent.getTaskAttemptID().getTaskID().getVertexID().getDAGId()).getVertex(taskAttemptEvent.getTaskAttemptID().getTaskID().getVertexID()).getTask(taskAttemptEvent.getTaskAttemptID().getTaskID()).getAttempt(taskAttemptEvent.getTaskAttemptID()).handle(taskAttemptEvent);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$TaskEventDispatcher.class */
    private class TaskEventDispatcher implements EventHandler<TaskEvent> {
        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent taskEvent) {
            TestDAGImpl.this.chooseDAG(taskEvent.getTaskID().getVertexID().getDAGId()).getVertex(taskEvent.getTaskID().getVertexID()).getTask(taskEvent.getTaskID()).handle(taskEvent);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$TotalCountingOutputCommitter.class */
    public static class TotalCountingOutputCommitter extends TestVertexImpl.CountingOutputCommitter {
        static int totalCommitCounter = 0;

        public TotalCountingOutputCommitter(OutputCommitterContext outputCommitterContext) {
            super(outputCommitterContext);
        }

        @Override // org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter
        public void commitOutput() throws IOException {
            totalCommitCounter++;
            super.commitOutput();
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestDAGImpl$VertexEventDispatcher.class */
    private class VertexEventDispatcher implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent vertexEvent) {
            TestDAGImpl.this.chooseDAG(vertexEvent.getVertexId().getDAGId()).getVertex(vertexEvent.getVertexId()).handle(vertexEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DAGImpl chooseDAG(TezDAGID tezDAGID) {
        if (tezDAGID.equals(this.dagId)) {
            return this.dag;
        }
        if (tezDAGID.equals(this.mrrDagId)) {
            return this.mrrDag;
        }
        if (tezDAGID.equals(this.groupDagId)) {
            return this.groupDag;
        }
        if (tezDAGID.equals(this.dagWithCustomEdgeId)) {
            return this.dagWithCustomEdge;
        }
        throw new RuntimeException("Invalid event, unknown dag, dagId=" + tezDAGID);
    }

    private DAGProtos.DAGPlan createTestMRRDAGPlan() {
        LOG.info("Setting up MRR dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output1").build()).setName("output1").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output2").build()).setName("output2").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e1").addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output3").build()).setName("output3").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    static DAGProtos.DAGPlan createGroupDAGPlan() {
        LOG.info("Setting up group dag plan");
        Resource newInstance = Resource.newInstance(1, 1);
        Vertex create = Vertex.create("vertex1", ProcessorDescriptor.create("Processor"), 1, newInstance);
        Vertex create2 = Vertex.create("vertex2", ProcessorDescriptor.create("Processor"), 1, newInstance);
        Vertex create3 = Vertex.create("vertex3", ProcessorDescriptor.create("Processor"), 1, newInstance);
        DAG create4 = DAG.create("testDag");
        OutputCommitterDescriptor create5 = OutputCommitterDescriptor.create(TotalCountingOutputCommitter.class.getName());
        VertexGroup createVertexGroup = create4.createVertexGroup("uv12", new Vertex[]{create, create2});
        OutputDescriptor create6 = OutputDescriptor.create("output.class");
        createVertexGroup.addDataSink("uvOut", DataSinkDescriptor.create(create6, create5, (Credentials) null));
        create3.addDataSink("uvOut", DataSinkDescriptor.create(create6, create5, (Credentials) null));
        GroupInputEdge create7 = GroupInputEdge.create(createVertexGroup, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class")), InputDescriptor.create("merge.class"));
        create4.addVertex(create);
        create4.addVertex(create2);
        create4.addVertex(create3);
        create4.addEdge(create7);
        return create4.createDag(conf, (Credentials) null, (Map) null, (LocalResource) null, true);
    }

    public static DAGProtos.DAGPlan createTestDAGPlan() {
        LOG.info("Setting up dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").setDagConf(DAGProtos.ConfigurationProto.newBuilder().addConfKeyValues(DAGProtos.PlanKeyValuePair.newBuilder().setKey("tez.am.task.max.failed.attempts").setValue("3"))).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).setVertexConf(DAGProtos.ConfigurationProto.newBuilder().addConfKeyValues(DAGProtos.PlanKeyValuePair.newBuilder().setKey("tez.am.task.max.failed.attempts").setValue("2"))).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutEdgeId("e3").addOutEdgeId("e4").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack4").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addOutEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host5").addRack("rack5").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x5.y5").build()).addInEdgeId("e4").addOutEdgeId("e6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host6").addRack("rack6").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x6.y6").build()).addInEdgeId("e5").addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i4_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v4")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i5_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v5")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o4")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")).setInputVertexName("vertex5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o5")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGWithCustomEdge(ExceptionLocation exceptionLocation, boolean z) {
        LOG.info("Setting up custome edge dag plan " + exceptionLocation + " " + z);
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(z ? CustomizedEdgeManagerLegacy.class.getName() : CustomizedEdgeManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFromUtf8(exceptionLocation.name())))).setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGWithNonExistEdgeManager() {
        LOG.info("Setting up dag plan with non-exist edgemanager");
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-edge-manager")).setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    @BeforeClass
    public static void beforeClass() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setup() {
        conf = new Configuration();
        conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(100L, 1), 1);
        this.dagId = TezDAGID.getInstance(this.appAttemptId.getApplicationId(), 1);
        Assert.assertNotNull(this.dagId);
        this.dagPlan = createTestDAGPlan();
        this.dispatcher = new DrainDispatcher();
        this.fsTokens = new Credentials();
        this.appContext = (AppContext) Mockito.mock(AppContext.class);
        this.execService = (ListeningExecutorService) Mockito.mock(ListeningExecutorService.class);
        final ListenableFuture listenableFuture = (ListenableFuture) Mockito.mock(ListenableFuture.class);
        Mockito.when(this.appContext.getHadoopShim()).thenReturn(this.defaultShim);
        Mockito.when(this.appContext.getApplicationID()).thenReturn(this.appAttemptId.getApplicationId());
        ((ListeningExecutorService) Mockito.doAnswer(new Answer() { // from class: org.apache.tez.dag.app.dag.impl.TestDAGImpl.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ListenableFuture<Void> m16answer(InvocationOnMock invocationOnMock) {
                TestDAGImpl.this.dispatcher.getEventHandler().handle((CallableEvent) invocationOnMock.getArguments()[0]);
                return listenableFuture;
            }
        }).when(this.execService)).submit((Callable) Matchers.any());
        ((AppContext) Mockito.doReturn(this.execService).when(this.appContext)).getExecService();
        this.historyEventHandler = (HistoryEventHandler) Mockito.mock(HistoryEventHandler.class);
        this.aclManager = new ACLManager("amUser");
        ((AppContext) Mockito.doReturn(conf).when(this.appContext)).getAMConf();
        ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.appContext)).getApplicationAttemptId();
        ((AppContext) Mockito.doReturn(this.appAttemptId.getApplicationId()).when(this.appContext)).getApplicationID();
        ((AppContext) Mockito.doReturn(this.dagId).when(this.appContext)).getCurrentDAGID();
        ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.appContext)).getHistoryHandler();
        ((AppContext) Mockito.doReturn(this.aclManager).when(this.appContext)).getAMACLManager();
        ((AppContext) Mockito.doReturn(this.defaultShim).when(this.appContext)).getHadoopShim();
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.dag);
        ((AppContext) Mockito.doReturn(this.dag).when(this.appContext)).getCurrentDAG();
        ((AppContext) Mockito.doReturn(this.clusterInfo).when(this.appContext)).getClusterInfo();
        this.mrrAppContext = (AppContext) Mockito.mock(AppContext.class);
        ((AppContext) Mockito.doReturn(this.aclManager).when(this.mrrAppContext)).getAMACLManager();
        ((AppContext) Mockito.doReturn(this.execService).when(this.mrrAppContext)).getExecService();
        ((AppContext) Mockito.doReturn(this.defaultShim).when(this.mrrAppContext)).getHadoopShim();
        this.mrrDagId = TezDAGID.getInstance(this.appAttemptId.getApplicationId(), 2);
        this.mrrDagPlan = createTestMRRDAGPlan();
        this.mrrDag = new DAGImpl(this.mrrDagId, conf, this.mrrDagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.mrrAppContext);
        this.mrrDag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.mrrDag);
        ((AppContext) Mockito.doReturn(conf).when(this.mrrAppContext)).getAMConf();
        ((AppContext) Mockito.doReturn(this.mrrDag).when(this.mrrAppContext)).getCurrentDAG();
        ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.mrrAppContext)).getApplicationAttemptId();
        ((AppContext) Mockito.doReturn(this.appAttemptId.getApplicationId()).when(this.mrrAppContext)).getApplicationID();
        ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.mrrAppContext)).getHistoryHandler();
        ((AppContext) Mockito.doReturn(this.clusterInfo).when(this.mrrAppContext)).getClusterInfo();
        this.groupAppContext = (AppContext) Mockito.mock(AppContext.class);
        ((AppContext) Mockito.doReturn(this.aclManager).when(this.groupAppContext)).getAMACLManager();
        ((AppContext) Mockito.doReturn(this.execService).when(this.groupAppContext)).getExecService();
        ((AppContext) Mockito.doReturn(this.defaultShim).when(this.groupAppContext)).getHadoopShim();
        this.groupDagId = TezDAGID.getInstance(this.appAttemptId.getApplicationId(), 3);
        this.groupDagPlan = createGroupDAGPlan();
        this.groupDag = new DAGImpl(this.groupDagId, conf, this.groupDagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.groupAppContext);
        this.groupDag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.groupDag);
        ((AppContext) Mockito.doReturn(conf).when(this.groupAppContext)).getAMConf();
        ((AppContext) Mockito.doReturn(this.groupDag).when(this.groupAppContext)).getCurrentDAG();
        ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.groupAppContext)).getApplicationAttemptId();
        ((AppContext) Mockito.doReturn(this.appAttemptId.getApplicationId()).when(this.groupAppContext)).getApplicationID();
        ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.groupAppContext)).getHistoryHandler();
        ((AppContext) Mockito.doReturn(this.clusterInfo).when(this.groupAppContext)).getClusterInfo();
        TotalCountingOutputCommitter.totalCommitCounter = 0;
        this.dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, this.taskEventDispatcher);
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, this.taskAttemptEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, this.dagEventDispatcher);
        this.dagFinishEventHandler = new DAGFinishEventHandler();
        this.dispatcher.register(DAGAppMasterEventType.class, this.dagFinishEventHandler);
        this.dispatcher.init(conf);
        this.dispatcher.start();
    }

    @After
    public void teardown() {
        this.dispatcher.await();
        this.dispatcher.stop();
        this.execService.shutdownNow();
        this.dagPlan = null;
        if (this.dag != null) {
            this.dag.entityUpdateTracker.stop();
        }
        if (this.mrrDag != null) {
            this.mrrDag.entityUpdateTracker.stop();
        }
        if (this.groupDag != null) {
            this.groupDag.entityUpdateTracker.stop();
        }
        if (this.dagWithCustomEdge != null) {
            this.dagWithCustomEdge.entityUpdateTracker.stop();
        }
        this.dag = null;
        this.mrrDag = null;
        this.groupDag = null;
        this.dagWithCustomEdge = null;
    }

    private void setupDAGWithCustomEdge(ExceptionLocation exceptionLocation) {
        setupDAGWithCustomEdge(exceptionLocation, false);
    }

    private void setupDAGWithCustomEdge(ExceptionLocation exceptionLocation, boolean z) {
        this.dagWithCustomEdgeId = TezDAGID.getInstance(this.appAttemptId.getApplicationId(), 4);
        this.dagPlanWithCustomEdge = createDAGWithCustomEdge(exceptionLocation, z);
        this.dagWithCustomEdgeAppContext = (AppContext) Mockito.mock(AppContext.class);
        ((AppContext) Mockito.doReturn(this.aclManager).when(this.dagWithCustomEdgeAppContext)).getAMACLManager();
        Mockito.when(this.dagWithCustomEdgeAppContext.getHadoopShim()).thenReturn(this.defaultShim);
        this.dagWithCustomEdge = new DAGImpl(this.dagWithCustomEdgeId, conf, this.dagPlanWithCustomEdge, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.dagWithCustomEdgeAppContext);
        this.dagWithCustomEdge.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.dagWithCustomEdge);
        ((AppContext) Mockito.doReturn(conf).when(this.dagWithCustomEdgeAppContext)).getAMConf();
        ((AppContext) Mockito.doReturn(this.execService).when(this.dagWithCustomEdgeAppContext)).getExecService();
        ((AppContext) Mockito.doReturn(this.dagWithCustomEdge).when(this.dagWithCustomEdgeAppContext)).getCurrentDAG();
        ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.dagWithCustomEdgeAppContext)).getApplicationAttemptId();
        ((AppContext) Mockito.doReturn(this.appAttemptId.getApplicationId()).when(this.dagWithCustomEdgeAppContext)).getApplicationID();
        ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.dagWithCustomEdgeAppContext)).getHistoryHandler();
        ((AppContext) Mockito.doReturn(this.clusterInfo).when(this.dagWithCustomEdgeAppContext)).getClusterInfo();
        this.dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2());
        this.dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler());
        Mockito.when(this.dagWithCustomEdgeAppContext.getContainerLauncherName(Matchers.anyInt())).thenReturn(TezConstants.getTezYarnServicePluginName());
    }

    private void initDAG(DAGImpl dAGImpl) {
        dAGImpl.handle(new DAGEvent(dAGImpl.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals(DAGState.INITED, dAGImpl.getState());
    }

    private void startDAG(DAGImpl dAGImpl) {
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(dAGImpl.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, dAGImpl.getState());
    }

    @Test(timeout = 5000)
    public void testDAGInit() {
        initDAG(this.dag);
        Assert.assertEquals(6L, this.dag.getTotalVertices());
    }

    @Test(timeout = 5000)
    public void testDAGInitFailed() {
        setupDAGWithCustomEdge(ExceptionLocation.Initialize);
        this.dagWithCustomEdge.handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals(DAGState.FAILED, this.dagWithCustomEdge.getState());
        this.dagWithCustomEdge.handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_START));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.FAILED, this.dagWithCustomEdge.getState());
    }

    @Test(timeout = 5000)
    public void testDAGInitFailedDuetoInvalidResource() {
        ((AppContext) Mockito.doReturn(new ClusterInfo(Resource.newInstance(512, 10))).when(this.appContext)).getClusterInfo();
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(StringUtils.join(this.dag.getDiagnostics(), ",").contains("Vertex's TaskResource is beyond the cluster container capability"));
    }

    @Test(timeout = 5000)
    public void testDAGStart() {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < 6; i++) {
            Assert.assertEquals(VertexState.RUNNING, this.dag.getVertex(TezVertexID.getInstance(this.dagId, i)).getState());
            if (i < 2) {
                Assert.assertEquals(0L, r0.getDistanceFromRoot());
            } else if (i == 2) {
                Assert.assertEquals(1L, r0.getDistanceFromRoot());
            } else if (i > 2 && i < 5) {
                Assert.assertEquals(2L, r0.getDistanceFromRoot());
            } else if (i == 5) {
                Assert.assertEquals(3L, r0.getDistanceFromRoot());
            }
        }
        for (int i2 = 0; i2 < 6; i2++) {
            LOG.info("Distance from root: v" + i2 + ":" + this.dag.getVertex(TezVertexID.getInstance(this.dagId, i2)).getDistanceFromRoot());
        }
    }

    @Test(timeout = 5000)
    public void testNonExistEdgeManagerPlugin() {
        this.dagPlan = createDAGWithNonExistEdgeManager();
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.dag);
        ((AppContext) Mockito.doReturn(this.dag).when(this.appContext)).getCurrentDAG();
        this.dag.handle(new DAGEvent(this.dagId, DAGEventType.DAG_INIT));
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(StringUtils.join(this.dag.getDiagnostics(), "").contains("java.lang.ClassNotFoundException: non-exist-edge-manager"));
    }

    @Test(timeout = 5000)
    public void testNonExistDAGScheduler() {
        conf.set("tez.am.dag.scheduler.class", "non-exist-dag-scheduler");
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.dag);
        ((AppContext) Mockito.doReturn(this.dag).when(this.appContext)).getCurrentDAG();
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, this.dag.getTerminationCause());
        Assert.assertTrue(StringUtils.join(this.dag.getDiagnostics(), "").contains("java.lang.ClassNotFoundException: non-exist-dag-scheduler"));
    }

    @Test(timeout = 5000)
    public void testVertexCompletion() {
        initDAG(this.dag);
        Assert.assertTrue(0.0f == this.dag.getCompletedTaskProgress());
        startDAG(this.dag);
        Assert.assertTrue(0.0f == this.dag.getCompletedTaskProgress());
        this.dispatcher.await();
        TezVertexID tezVertexID = TezVertexID.getInstance(this.dagId, 1);
        org.apache.tez.dag.app.dag.Vertex vertex = this.dag.getVertex(tezVertexID);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID, 0), TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID, 1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(1L, this.dag.getSuccessfulVertices());
        Assert.assertEquals(0.1818181872367859d, this.dag.getCompletedTaskProgress(), 0.05d);
    }

    @Test(timeout = 5000)
    public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
        setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertTrue(StringUtils.join(this.dagWithCustomEdge.getVertex("vertex2").getDiagnostics(), ",").contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
    }

    @Test(timeout = 5000)
    public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
        setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.FAILED, this.dagWithCustomEdge.getState());
        Assert.assertTrue(StringUtils.join(this.dagWithCustomEdge.getVertex("vertex1").getDiagnostics(), ",").contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
    }

    @Test(timeout = 5000)
    public void testEdgeManager_RouteDataMovementEventToDestination() {
        setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dagWithCustomEdge.getState());
        VertexImpl vertex = this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl vertex2 = this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task task = vertex2.getTask(0);
        TaskAttemptImpl attempt = task.getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), 0));
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertex2.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", attempt.getID()))})));
        this.dispatcher.await();
        vertex2.getTaskAttemptTezEvents(attempt.getID(), 0, 0, 1000);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertex2.getState());
        Assert.assertEquals(VertexState.KILLED, vertex.getState());
        Assert.assertTrue(StringUtils.join(vertex2.getDiagnostics(), ",").contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
    }

    @Test(timeout = 5000)
    public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
        setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dagWithCustomEdge.getState());
        VertexImpl vertex = this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl vertex2 = this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task task = vertex2.getTask(0);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertex2.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", task.getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), 0)).getID()))})));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertex2.getState());
        Assert.assertEquals(VertexState.KILLED, vertex.getState());
        Assert.assertTrue(StringUtils.join(vertex2.getDiagnostics(), ",").contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
    }

    @Test(timeout = 5000)
    public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
        setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination, true);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dagWithCustomEdge.getState());
        VertexImpl vertex = this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl vertex2 = this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task task = vertex2.getTask(0);
        TaskAttemptImpl attempt = task.getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), 0));
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertex2.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(InputFailedEvent.create(0, 1), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", attempt.getID()))})));
        this.dispatcher.await();
        vertex2.getTaskAttemptTezEvents(attempt.getID(), 0, 0, 1000);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertex2.getState());
        Assert.assertEquals(VertexState.KILLED, vertex.getState());
        Assert.assertTrue(StringUtils.join(vertex2.getDiagnostics(), ",").contains(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
    }

    @Test(timeout = 5000)
    public void testEdgeManager_GetNumDestinationConsumerTasks() {
        setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dagWithCustomEdge.getState());
        VertexImpl vertex = this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl vertex2 = this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task task = vertex2.getTask(0);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertex2.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(InputReadErrorEvent.create("", 0, 0), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "vertex1", task.getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), 0)).getID()))})));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertex2.getState());
        Assert.assertEquals(VertexState.KILLED, vertex.getState());
        Assert.assertTrue(StringUtils.join(vertex2.getDiagnostics(), ",").contains(ExceptionLocation.GetNumDestinationConsumerTasks.name()));
    }

    @Test(timeout = 5000)
    public void testEdgeManager_RouteInputErrorEventToSource() {
        setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource);
        this.dispatcher.getEventHandler().handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagWithCustomEdge.getID(), (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dagWithCustomEdge.getState());
        VertexImpl vertex = this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl vertex2 = this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task task = vertex2.getTask(0);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertex2.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(InputReadErrorEvent.create("", 0, 0), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "vertex1", task.getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), 0)).getID()))})));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertex2.getState());
        Assert.assertEquals(VertexState.KILLED, vertex.getState());
        Assert.assertTrue(StringUtils.join(vertex2.getDiagnostics(), ",").contains(ExceptionLocation.RouteInputErrorEventToSource.name()));
    }

    @Test(timeout = 5000)
    public void testGroupDAGCompletionWithCommitSuccess() {
        initDAG(this.groupDag);
        startDAG(this.groupDag);
        this.dispatcher.await();
        for (int i = 0; i < 3; i++) {
            org.apache.tez.dag.app.dag.Vertex vertex = this.groupDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex.getVertexId(), 0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
            Assert.assertEquals(i + 1, this.groupDag.getSuccessfulVertices());
        }
        Assert.assertEquals(3L, this.groupDag.getSuccessfulVertices());
        Assert.assertTrue(1.0f == this.groupDag.getCompletedTaskProgress());
        Assert.assertEquals(DAGState.SUCCEEDED, this.groupDag.getState());
        Assert.assertEquals(2L, TotalCountingOutputCommitter.totalCommitCounter);
    }

    @Test(timeout = 5000)
    public void testGroupDAGWithVertexReRunning() {
        this.groupDag.getConf().setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        initDAG(this.groupDag);
        startDAG(this.groupDag);
        this.dispatcher.await();
        org.apache.tez.dag.app.dag.Vertex vertex = this.groupDag.getVertex("vertex1");
        org.apache.tez.dag.app.dag.Vertex vertex2 = this.groupDag.getVertex("vertex2");
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(vertex.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(vertex.getVertexId()));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(vertex2.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(0L, TotalCountingOutputCommitter.totalCommitCounter);
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(vertex.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(1L, TotalCountingOutputCommitter.totalCommitCounter);
        Assert.assertEquals(2L, this.groupDag.getSuccessfulVertices());
    }

    @Test(timeout = 5000)
    public void testGroupDAGWithVertexReRunningAfterCommit() {
        this.groupDag.getConf().setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        initDAG(this.groupDag);
        startDAG(this.groupDag);
        this.dispatcher.await();
        org.apache.tez.dag.app.dag.Vertex vertex = this.groupDag.getVertex("vertex1");
        org.apache.tez.dag.app.dag.Vertex vertex2 = this.groupDag.getVertex("vertex2");
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(vertex.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(vertex2.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(1L, TotalCountingOutputCommitter.totalCommitCounter);
        this.dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(vertex.getVertexId()));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.FAILED, this.groupDag.getState());
        Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, this.groupDag.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testDAGCompletionWithCommitSuccess() {
        initDAG(this.mrrDag);
        this.dispatcher.await();
        startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; i++) {
            org.apache.tez.dag.app.dag.Vertex vertex = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex.getVertexId(), 0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
            Assert.assertEquals(i + 1, this.mrrDag.getSuccessfulVertices());
        }
        Iterator it = this.mrrDag.vertices.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((org.apache.tez.dag.app.dag.Vertex) it.next()).getOutputCommitters().values().iterator();
            while (it2.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it2.next());
                Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
                Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
                Assert.assertEquals(1L, countingOutputCommitter.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
            }
        }
        org.apache.tez.dag.app.dag.Vertex vertex2 = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex2.getVertexId(), 0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(3L, this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals(DAGState.SUCCEEDED, this.mrrDag.getState());
        Iterator it3 = this.mrrDag.vertices.values().iterator();
        while (it3.hasNext()) {
            Iterator it4 = ((org.apache.tez.dag.app.dag.Vertex) it3.next()).getOutputCommitters().values().iterator();
            while (it4.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter2 = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it4.next());
                Assert.assertEquals(0L, countingOutputCommitter2.abortCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.commitCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
            }
        }
    }

    @Test(timeout = 5000)
    public void testDAGCompletionWithCommitFailure() throws IOException {
        initDAG(this.mrrDag);
        this.dispatcher.await();
        org.apache.tez.dag.app.dag.Vertex vertex = this.mrrDag.getVertex("vertex3");
        ArrayList arrayList = new ArrayList();
        arrayList.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(new TestVertexImpl.CountingOutputCommitter.CountingOutputCommitterConfig(true, false, false).toUserPayload())).build())).setName("output3").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        vertex.setAdditionalOutputs(arrayList);
        startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; i++) {
            org.apache.tez.dag.app.dag.Vertex vertex2 = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex2.getVertexId(), 0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
            Assert.assertEquals(i + 1, this.mrrDag.getSuccessfulVertices());
        }
        Iterator it = this.mrrDag.vertices.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((org.apache.tez.dag.app.dag.Vertex) it.next()).getOutputCommitters().values().iterator();
            while (it2.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it2.next());
                Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
                Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
                Assert.assertEquals(1L, countingOutputCommitter.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
            }
        }
        org.apache.tez.dag.app.dag.Vertex vertex3 = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex3.getVertexId(), 0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertex3.getState());
        Assert.assertEquals(3L, this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals(DAGState.FAILED, this.mrrDag.getState());
        Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, this.mrrDag.getTerminationCause());
        Iterator it3 = this.mrrDag.vertices.values().iterator();
        while (it3.hasNext()) {
            Iterator it4 = ((org.apache.tez.dag.app.dag.Vertex) it3.next()).getOutputCommitters().values().iterator();
            while (it4.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter2 = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it4.next());
                Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
            }
        }
    }

    @Test(timeout = 5000)
    public void testDAGErrorAbortAllOutputs() {
        initDAG(this.mrrDag);
        this.dispatcher.await();
        startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; i++) {
            org.apache.tez.dag.app.dag.Vertex vertex = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex.getVertexId(), 0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
            Assert.assertEquals(i + 1, this.mrrDag.getSuccessfulVertices());
        }
        Iterator it = this.mrrDag.vertices.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((org.apache.tez.dag.app.dag.Vertex) it.next()).getOutputCommitters().values().iterator();
            while (it2.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it2.next());
                Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
                Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
                Assert.assertEquals(1L, countingOutputCommitter.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
            }
        }
        org.apache.tez.dag.app.dag.Vertex vertex2 = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertex2.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.ERROR, vertex2.getState());
        Assert.assertEquals(DAGState.ERROR, this.mrrDag.getState());
        Iterator it3 = this.mrrDag.vertices.values().iterator();
        while (it3.hasNext()) {
            Iterator it4 = ((org.apache.tez.dag.app.dag.Vertex) it3.next()).getOutputCommitters().values().iterator();
            while (it4.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter2 = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it4.next());
                Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
                Assert.assertEquals(0L, countingOutputCommitter2.commitCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
            }
        }
    }

    @Test(timeout = 5000)
    public void testDAGErrorAbortNonSuccessfulOutputs() {
        this.mrrDag.getConf().setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        initDAG(this.mrrDag);
        this.dispatcher.await();
        startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; i++) {
            org.apache.tez.dag.app.dag.Vertex vertex = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex.getVertexId(), 0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
            Assert.assertEquals(i + 1, this.mrrDag.getSuccessfulVertices());
            Iterator it = vertex.getOutputCommitters().values().iterator();
            while (it.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it.next());
                Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
                Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
                Assert.assertEquals(1L, countingOutputCommitter.initCounter);
                Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
            }
        }
        org.apache.tez.dag.app.dag.Vertex vertex2 = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertex2.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.ERROR, vertex2.getState());
        this.dispatcher.await();
        Assert.assertEquals(DAGState.ERROR, this.mrrDag.getState());
        for (org.apache.tez.dag.app.dag.Vertex vertex3 : this.mrrDag.vertices.values()) {
            Iterator it2 = vertex3.getOutputCommitters().values().iterator();
            while (it2.hasNext()) {
                TestVertexImpl.CountingOutputCommitter countingOutputCommitter2 = (TestVertexImpl.CountingOutputCommitter) ((OutputCommitter) it2.next());
                if (vertex3 == vertex2) {
                    Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
                    Assert.assertEquals(0L, countingOutputCommitter2.commitCounter);
                    Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
                    Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
                } else {
                    Assert.assertEquals(1L, countingOutputCommitter2.abortCounter);
                    Assert.assertEquals(1L, countingOutputCommitter2.commitCounter);
                    Assert.assertEquals(1L, countingOutputCommitter2.initCounter);
                    Assert.assertEquals(1L, countingOutputCommitter2.setupCounter);
                }
            }
        }
    }

    @Test(timeout = 5000)
    public void testVertexReRunning() {
        initDAG(this.dag);
        this.dag.dagScheduler = (DAGScheduler) Mockito.mock(DAGScheduler.class);
        startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID tezVertexID = TezVertexID.getInstance(this.dagId, 1);
        org.apache.tez.dag.app.dag.Vertex vertex = this.dag.getVertex(tezVertexID);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID, 0), TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID, 1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(1L, this.dag.getSuccessfulVertices());
        Assert.assertEquals(1L, this.dag.numCompletedVertices);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(TezTaskID.getInstance(tezVertexID, 0)));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertex.getState());
        Assert.assertEquals(0L, this.dag.getSuccessfulVertices());
        Assert.assertEquals(0L, this.dag.numCompletedVertices);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID, 0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
        Assert.assertEquals(1L, this.dag.getSuccessfulVertices());
        Assert.assertEquals(1L, this.dag.numCompletedVertices);
    }

    public void testKillStartedDAG() {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventTerminateDag(this.dagId, DAGTerminationCause.DAG_KILL, (String) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.KILLED, this.dag.getState());
        for (int i = 0; i < 6; i++) {
            Assert.assertEquals(VertexState.KILLED, this.dag.getVertex(TezVertexID.getInstance(this.dagId, i)).getState());
        }
    }

    @Test(timeout = 5000)
    public void testKillRunningDAG() {
        _testTerminateRunningDAG(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testServiceErrorRunningDAG() {
        _testTerminateRunningDAG(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testTerminateRunningDAG(DAGTerminationCause dAGTerminationCause) {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID tezVertexID = TezVertexID.getInstance(this.dagId, 1);
        EventHandler vertex = this.dag.getVertex(tezVertexID);
        vertex.handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID, 0), TaskState.SUCCEEDED));
        TezVertexID tezVertexID2 = TezVertexID.getInstance(this.dagId, 0);
        EventHandler vertex2 = this.dag.getVertex(tezVertexID2);
        vertex2.handle(new VertexEventTaskCompleted(TezTaskID.getInstance(tezVertexID2, 0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertex.getState());
        this.dispatcher.getEventHandler().handle(new DAGEventTerminateDag(this.dagId, dAGTerminationCause, (String) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.TERMINATING, this.dag.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertex2.getState());
        Assert.assertEquals(VertexState.TERMINATING, vertex.getState());
        for (int i = 2; i < 6; i++) {
            Assert.assertEquals(VertexState.KILLED, this.dag.getVertex(TezVertexID.getInstance(this.dagId, i)).getState());
        }
        Assert.assertEquals(1L, this.dag.getSuccessfulVertices());
    }

    @Test(timeout = 5000)
    public void testInvalidEvent() {
        this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagId, (List) null));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.ERROR, this.dag.getState());
    }

    @Test(timeout = 5000)
    @Ignore
    public void testVertexSuccessfulCompletionUpdates() {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < 6; i++) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        Assert.assertEquals(1L, this.dag.getSuccessfulVertices());
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 2), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 3), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 4), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 5), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.SUCCEEDED, this.dag.getState());
        Assert.assertEquals(6L, this.dag.getSuccessfulVertices());
    }

    @Test(timeout = 10000)
    public void testGetDAGStatusWithWait() throws TezException {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < this.dag.getVertices().size() - 1; i++) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        Assert.assertEquals(5L, this.dag.getSuccessfulVertices());
        long currentTimeMillis = System.currentTimeMillis();
        DAGStatusBuilder dAGStatus = this.dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue(currentTimeMillis2 >= 0 && currentTimeMillis2 < 2500);
        Assert.assertEquals(DAGStatus.State.RUNNING, dAGStatus.getState());
    }

    @Test(timeout = 20000)
    public void testGetDAGStatusReturnOnDagSucceeded() throws InterruptedException, TezException {
        runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.SUCCEEDED);
    }

    @Test(timeout = 20000)
    public void testGetDAGStatusReturnOnDagFailed() throws InterruptedException, TezException {
        runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.FAILED);
    }

    @Test(timeout = 20000)
    public void testGetDAGStatusReturnOnDagKilled() throws InterruptedException, TezException {
        runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.KILLED);
    }

    @Test(timeout = 20000)
    public void testGetDAGStatusReturnOnDagError() throws InterruptedException, TezException {
        runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.ERROR);
    }

    public void runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State state) throws TezException, InterruptedException {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < this.dag.getVertices().size() - 1; i++) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        Assert.assertEquals(5L, this.dag.getSuccessfulVertices());
        Assert.assertEquals(DAGStatus.State.RUNNING, this.dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 10000L).getState());
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        Condition newCondition2 = reentrantLock.newCondition();
        DagStatusCheckRunnable dagStatusCheckRunnable = new DagStatusCheckRunnable(reentrantLock, newCondition, newCondition2);
        Thread thread = new Thread(dagStatusCheckRunnable);
        thread.start();
        reentrantLock.lock();
        while (!dagStatusCheckRunnable.started.get()) {
            try {
                newCondition.await();
            } finally {
            }
        }
        Thread.sleep(2000L);
        if (state == DAGStatus.State.SUCCEEDED) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 5), VertexState.SUCCEEDED));
        } else if (state == DAGStatus.State.FAILED) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 5), VertexState.FAILED));
        } else if (state == DAGStatus.State.KILLED) {
            this.dispatcher.getEventHandler().handle(new DAGEventTerminateDag(this.dagId, DAGTerminationCause.DAG_KILL, (String) null));
        } else {
            if (state != DAGStatus.State.ERROR) {
                throw new UnsupportedOperationException("Unsupported state for test: " + state);
            }
            this.dispatcher.getEventHandler().handle(new DAGEventStartDag(this.dagId, new LinkedList()));
        }
        this.dispatcher.await();
        reentrantLock.lock();
        while (!dagStatusCheckRunnable.ended.get()) {
            try {
                newCondition2.await();
            } finally {
                reentrantLock.unlock();
            }
        }
        reentrantLock.unlock();
        long j = dagStatusCheckRunnable.dagStatusEndTime - dagStatusCheckRunnable.dagStatusStartTime;
        Assert.assertNotNull(dagStatusCheckRunnable.dagStatus);
        Assert.assertTrue("Status: " + dagStatusCheckRunnable.dagStatus.getState() + ", Diff:" + j, j >= 0 && j < 3500);
        Assert.assertEquals(state, dagStatusCheckRunnable.dagStatus.getState());
        thread.join();
    }

    @Test(timeout = 5000)
    public void testVertexFailureHandling() {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 2), VertexState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.FAILED, this.dag.getState());
        Assert.assertEquals(2L, this.dag.getSuccessfulVertices());
        for (int i = 3; i < 6; i++) {
            Assert.assertEquals(VertexState.KILLED, this.dag.getVertex(TezVertexID.getInstance(this.dagId, i)).getState());
        }
    }

    @Test(timeout = 5000)
    public void testDAGKill() {
        _testDAGTerminate(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testDAGServiceError() {
        _testDAGTerminate(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGTerminate(DAGTerminationCause dAGTerminationCause) {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventTerminateDag(this.dagId, dAGTerminationCause, (String) null));
        this.dispatcher.await();
        Assert.assertEquals(dAGTerminationCause.getFinishedState(), this.dag.getState());
        Assert.assertEquals(dAGTerminationCause, this.dag.getTerminationCause());
        Assert.assertEquals(2L, this.dag.getSuccessfulVertices());
        int i = 0;
        Iterator it = this.dag.getVertices().entrySet().iterator();
        while (it.hasNext()) {
            if (((org.apache.tez.dag.app.dag.Vertex) ((Map.Entry) it.next()).getValue()).getState() == VertexState.KILLED) {
                i++;
            }
        }
        Assert.assertEquals(4L, i);
        Iterator it2 = this.dag.getVertices().values().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, ((org.apache.tez.dag.app.dag.Vertex) it2.next()).getTerminationCause());
        }
        Assert.assertEquals(1L, this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout = 5000)
    public void testDAGHang() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.dag = (DAGImpl) Mockito.spy(new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext));
        DAGImpl dAGImpl = this.dag;
        Mockito.when(this.dag.getStateMachine()).thenReturn((StateMachineTez) Mockito.spy(new StateMachineTez(DAGImpl.stateMachineFactory.make(this.dag), this.dag)));
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.dag);
        ((AppContext) Mockito.doReturn(this.dag).when(this.appContext)).getCurrentDAG();
        DAGImpl.OutputKey outputKey = (DAGImpl.OutputKey) Mockito.mock(DAGImpl.OutputKey.class);
        this.dag.commitFutures.put(outputKey, (ListenableFuture) Mockito.mock(ListenableFuture.class));
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 2), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 3), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 4), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 5), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.COMMITTING, this.dag.getState());
        DAGEventCommitCompleted dAGEventCommitCompleted = new DAGEventCommitCompleted(this.dagId, outputKey, false, new RuntimeException("test"));
        ((DAGImpl) Mockito.doThrow(new RuntimeException("test")).when(this.dag)).logJobHistoryUnsuccesfulEvent((DAGState) Matchers.any(DAGState.class), (TezCounters) Matchers.any(TezCounters.class));
        this.dag.handle(dAGEventCommitCompleted);
        this.dispatcher.await();
        Assert.assertTrue("DAG did not terminate!", this.dag.getInternalState() == DAGState.FAILED);
    }

    @Test(timeout = 5000)
    public void testDAGKillVertexSuccessAfterTerminated() {
        _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testDAGServiceErrorVertexSuccessAfterTerminated() {
        _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause dAGTerminationCause) {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new DAGEventTerminateDag(this.dagId, dAGTerminationCause, (String) null));
        this.dispatcher.await();
        Assert.assertEquals(dAGTerminationCause.getFinishedState(), this.dag.getState());
        for (int i = 2; i < 6; i++) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        int i2 = 0;
        Iterator it = this.dag.getVertices().entrySet().iterator();
        while (it.hasNext()) {
            if (((org.apache.tez.dag.app.dag.Vertex) ((Map.Entry) it.next()).getValue()).getState() == VertexState.KILLED) {
                i2++;
            }
        }
        Assert.assertEquals(4L, i2);
        Assert.assertEquals(dAGTerminationCause, this.dag.getTerminationCause());
        Assert.assertEquals(2L, this.dag.getSuccessfulVertices());
        Iterator it2 = this.dag.getVertices().values().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, ((org.apache.tez.dag.app.dag.Vertex) it2.next()).getTerminationCause());
        }
        Assert.assertEquals(1L, this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout = 5000)
    public void testDAGKillPending() {
        _testDAGKillPending(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout = 5000)
    public void testDAGServiceErrorPending() {
        _testDAGKillPending(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGKillPending(DAGTerminationCause dAGTerminationCause) {
        initDAG(this.dag);
        startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(DAGState.RUNNING, this.dag.getState());
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 1), VertexState.SUCCEEDED));
        for (int i = 2; i < 5; i++) {
            this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new DAGEventTerminateDag(this.dagId, dAGTerminationCause, (String) null));
        this.dispatcher.await();
        Assert.assertEquals(dAGTerminationCause.getFinishedState(), this.dag.getState());
        this.dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(TezVertexID.getInstance(this.dagId, 5), VertexState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals(dAGTerminationCause.getFinishedState(), this.dag.getState());
        Assert.assertEquals(5L, this.dag.getSuccessfulVertices());
        Assert.assertEquals(this.dag.getVertex(TezVertexID.getInstance(this.dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_TERMINATED);
        Assert.assertEquals(1L, this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout = 5000)
    public void testConfiguration() throws AMUserCodeException {
        initDAG(this.dag);
        Assert.assertEquals(3L, this.dag.getConf().getInt("tez.am.task.max.failed.attempts", 4));
        org.apache.tez.dag.app.dag.Vertex vertex = this.dag.getVertex("vertex1");
        org.apache.tez.dag.app.dag.Vertex vertex2 = this.dag.getVertex("vertex2");
        Assert.assertEquals(2L, vertex.getConf().getInt("tez.am.task.max.failed.attempts", 4));
        Assert.assertEquals(3L, vertex2.getConf().getInt("tez.am.task.max.failed.attempts", 4));
    }

    @Test(timeout = 5000)
    public void testCounterLimits() {
        initDAG(this.mrrDag);
        this.dispatcher.await();
        startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 3; i++) {
            VertexImpl vertex = this.mrrDag.getVertex("vertex" + (i + 1));
            TezCounters tezCounters = new TezCounters();
            for (int i2 = 0; i2 < 50; i2++) {
                tezCounters.findCounter("g", "c" + i + "_" + i2).increment(1L);
            }
            vertex.setCounters(tezCounters);
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertex.getVertexId(), 0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertex.getState());
            Assert.assertEquals(i + 1, this.mrrDag.getSuccessfulVertices());
        }
        Assert.assertEquals(3L, this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals(DAGState.FAILED, this.mrrDag.getState());
        Assert.assertTrue("Diagnostics should contain counter limits error message", StringUtils.join(this.mrrDag.getDiagnostics(), ",").contains("Counters limit exceeded"));
    }

    static {
        Limits.reset();
        Configuration configuration = new Configuration(false);
        configuration.setInt("tez.counters.max", 100);
        configuration.setInt("tez.counters.max.groups", 100);
        Limits.setConfiguration(configuration);
    }
}
