/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.CallableEventDispatcher;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestCommit {
    private static final Log LOG = LogFactory.getLog(TestCommit.class);
    private TezDAGID dagId;
    private static Configuration conf = new Configuration();
    private DrainDispatcher dispatcher;
    private Credentials fsTokens;
    private AppContext appContext;
    private ACLManager aclManager;
    private ApplicationAttemptId appAttemptId;
    private DAGImpl dag;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private TaskHeartbeatHandler thh;
    private Clock clock = new SystemClock();
    private DAGFinishEventHandler dagFinishEventHandler;
    private MockHistoryEventHandler historyEventHandler;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private ExecutorService rawExecutor;
    private ListeningExecutorService execService;

    public void setupDAG(DAGProtos.DAGPlan dagPlan) {
        conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        this.dagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)1);
        Assert.assertNotNull((Object)this.dagId);
        this.dispatcher = new DrainDispatcher();
        this.fsTokens = new Credentials();
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        Mockito.when((Object)this.appContext.getHadoopShim()).thenReturn((Object)new DefaultHadoopShim());
        this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - #%d").build());
        this.execService = MoreExecutors.listeningDecorator((ExecutorService)this.rawExecutor);
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.appContext)).getExecService();
        this.historyEventHandler = new MockHistoryEventHandler(this.appContext);
        this.aclManager = new ACLManager("amUser");
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.appContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.appContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.appContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.dagId).when((Object)this.appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)((Object)this.historyEventHandler)).when((Object)this.appContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.appContext)).getAMACLManager();
        this.dag = new DAGImpl(this.dagId, conf, dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.dispatcher.getEventHandler()).when((Object)this.appContext)).getEventHandler();
        ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance((int)8192, (int)10));
        ((AppContext)Mockito.doReturn((Object)clusterInfo).when((Object)this.appContext)).getClusterInfo();
        this.dispatcher.register(CallableEventType.class, (EventHandler)new CallableEventDispatcher());
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, (EventHandler)this.taskEventDispatcher);
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)this.taskAttemptEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, (EventHandler)this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler)this.dagEventDispatcher);
        this.dagFinishEventHandler = new DAGFinishEventHandler();
        this.dispatcher.register(DAGAppMasterEventType.class, (EventHandler)this.dagFinishEventHandler);
        this.dispatcher.init(conf);
        this.dispatcher.start();
    }

    @After
    public void teardown() {
        if (this.dispatcher != null) {
            this.dispatcher.await();
            this.dispatcher.stop();
        }
        if (this.execService != null) {
            this.execService.shutdownNow();
        }
    }

    private void waitUntil(DAGImpl dag, DAGState state) {
        while (dag.getState() != state) {
            LOG.info((Object)("Wait for dag go to state:" + state));
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void waitUntil(VertexImpl vertex, VertexState state) {
        while (vertex.getState() != state) {
            LOG.info((Object)("Wait for vertex " + vertex.getLogIdentifier() + " go to state:" + state));
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void waitForCommitCompleted(VertexImpl vertex, String outputName) {
        while (vertex.commitFutures.containsKey(outputName)) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOG.info((Object)("Wait for vertex commit " + outputName + " to complete"));
        }
    }

    private void waitForCommitCompleted(DAGImpl vertex, DAGImpl.OutputKey outputKey) {
        while (vertex.commitFutures.containsKey(outputKey)) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOG.info((Object)("Wait for dag commit " + outputKey + " to complete"));
        }
    }

    private DAGProtos.DAGPlan createDAGPlan(boolean vertexGroupCommitSucceeded, boolean v3CommitSucceeded, String dagName) throws Exception {
        LOG.info((Object)"Setting up group dag plan");
        int dummyTaskCount = 1;
        Resource dummyTaskResource = Resource.newInstance((int)1, (int)1);
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"vertex1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create((String)"vertex2", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create((String)"vertex3", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        DAG testDag = DAG.create((String)("DAG-" + dagName));
        String groupName1 = "uv12";
        OutputCommitterDescriptor ocd1 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!vertexGroupCommitSucceeded, true).toUserPayload())));
        OutputCommitterDescriptor ocd2 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!v3CommitSucceeded, true).toUserPayload())));
        VertexGroup uv12 = testDag.createVertexGroup(groupName1, new org.apache.tez.dag.api.Vertex[]{v1, v2});
        OutputDescriptor outDesc = OutputDescriptor.create((String)"output.class");
        uv12.addDataSink("v12Out", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd1, null));
        v3.addDataSink("v3Out", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd2, null));
        GroupInputEdge e1 = GroupInputEdge.create((VertexGroup)uv12, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"dummy output class"), (InputDescriptor)InputDescriptor.create((String)"dummy input class")), (InputDescriptor)InputDescriptor.create((String)"merge.class"));
        testDag.addVertex(v1);
        testDag.addVertex(v2);
        testDag.addVertex(v3);
        testDag.addEdge(e1);
        return testDag.createDag(conf, null, null, null, true);
    }

    private DAGProtos.DAGPlan createDAGPlanWith2VertexGroupOutputs(boolean vertexGroupCommitSucceeded1, boolean vertexGroupCommitSucceeded2, boolean v3CommitSucceeded, String dagName) throws Exception {
        LOG.info((Object)"Setting up group dag plan");
        int dummyTaskCount = 1;
        Resource dummyTaskResource = Resource.newInstance((int)1, (int)1);
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"vertex1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create((String)"vertex2", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create((String)"vertex3", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        DAG testDag = DAG.create((String)("DAG-" + dagName));
        String groupName1 = "uv12";
        OutputCommitterDescriptor ocd1 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!vertexGroupCommitSucceeded1, true).toUserPayload())));
        OutputCommitterDescriptor ocd2 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!vertexGroupCommitSucceeded2, true).toUserPayload())));
        OutputCommitterDescriptor ocd3 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!v3CommitSucceeded, true).toUserPayload())));
        VertexGroup uv12 = testDag.createVertexGroup(groupName1, new org.apache.tez.dag.api.Vertex[]{v1, v2});
        OutputDescriptor outDesc = OutputDescriptor.create((String)"output.class");
        uv12.addDataSink("v12Out1", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd1, null));
        uv12.addDataSink("v12Out2", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd2, null));
        v3.addDataSink("v3Out", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd3, null));
        GroupInputEdge e1 = GroupInputEdge.create((VertexGroup)uv12, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"dummy output class"), (InputDescriptor)InputDescriptor.create((String)"dummy input class")), (InputDescriptor)InputDescriptor.create((String)"merge.class"));
        testDag.addVertex(v1);
        testDag.addVertex(v2);
        testDag.addVertex(v3);
        testDag.addEdge(e1);
        return testDag.createDag(conf, null, null, null, true);
    }

    private DAGProtos.DAGPlan createDAGPlan_SingleVertexWith2Committer(boolean commit1Succeed, boolean commit2Succeed, String dagName) throws IOException {
        return this.createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false, dagName);
    }

    private DAGProtos.DAGPlan createDAGPlan_SingleVertexWith2Committer(boolean commit1Succeed, boolean commit2Succeed, boolean customVM, String dagName) throws IOException {
        LOG.info((Object)"Setting up group dag plan");
        int dummyTaskCount = 1;
        Resource dummyTaskResource = Resource.newInstance((int)1, (int)1);
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"vertex1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        if (customVM) {
            v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create((String)FailOnVMEventReceivedlVertexManager.class.getName()));
        }
        OutputCommitterDescriptor ocd1 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!commit1Succeed, true).toUserPayload())));
        OutputCommitterDescriptor ocd2 = (OutputCommitterDescriptor)OutputCommitterDescriptor.create((String)CountingOutputCommitter.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(!commit2Succeed, true).toUserPayload())));
        DAG testDag = DAG.create((String)("DAG-" + dagName));
        testDag.addVertex(v1);
        OutputDescriptor outDesc = OutputDescriptor.create((String)"output.class");
        v1.addDataSink("v1Out_1", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd1, null));
        v1.addDataSink("v1Out_2", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd2, null));
        return testDag.createDag(conf, null, null, null, true);
    }

    private void initDAG(DAGImpl dag) {
        dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.INITED, (Object)dag.getState());
    }

    private void startDAG(DAGImpl impl) {
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(impl.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)impl.getState());
    }

    @Test(timeout=5000L)
    public void testVertexCommit_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, true, "testVertexCommit_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertNull((Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)0L, (long)v1OutputCommitter_1.commitCounter);
        Assert.assertEquals((long)0L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)0L, (long)v1OutputCommitter_2.commitCounter);
        Assert.assertEquals((long)0L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexCommit_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, true, "testVertexCommit_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        v1OutputCommitter_1.unblockCommit();
        this.waitForCommitCompleted(v1, "v1Out_1");
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        v1OutputCommitter_2.unblockCommit();
        this.waitUntil(v1, VertexState.SUCCEEDED);
        Assert.assertNull((Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.commitCounter);
        Assert.assertEquals((long)0L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.commitCounter);
        Assert.assertEquals((long)0L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexCommitFail1_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(false, true, "testVertexCommitFail1_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        v1OutputCommitter_1.unblockCommit();
        this.waitUntil(v1, VertexState.FAILED);
        Assert.assertEquals((Object)VertexTerminationCause.COMMIT_FAILURE, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.commitCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexCommitFail2_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, false, "testVertexCommitFail2_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        v1OutputCommitter_1.unblockCommit();
        this.waitForCommitCompleted(v1, "v1Out_1");
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        v1OutputCommitter_2.unblockCommit();
        this.waitUntil(v1, VertexState.FAILED);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        Assert.assertEquals((Object)VertexTerminationCause.COMMIT_FAILURE, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.commitCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.commitCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexKilledWhileCommitting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, true, "testVertexKilledWhileCommitting"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        this.dag.handle((DAGEvent)new DAGEventTerminateDag(this.dag.getID(), DAGTerminationCause.DAG_KILL, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        Assert.assertEquals((Object)VertexTerminationCause.DAG_TERMINATED, (Object)v1.getTerminationCause());
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.DAG_KILL, (Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexRescheduleWhileCommitting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, true, "testVertexRescheduleWhileCommitting"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        v1.handle((VertexEvent)new VertexEventTaskReschedule(TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)2)));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_FAILURE, (Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexRouteEventErrorWhileCommitting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, true, true, "testVertexRouteEventErrorWhileCommitting"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)"vertex1", (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0), (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)vmEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", null, taId));
        v1.handle((VertexEvent)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)v1.commitFutures.isEmpty());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_FAILURE, (Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.abortCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexInternalErrorWhileCommiting() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan_SingleVertexWith2Committer(true, true, "testVertexInternalErrorWhileCommiting"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v1.getState());
        v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.INTERNAL_ERROR, (Object)v1.getTerminationCause());
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.INTERNAL_ERROR, (Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_1");
        CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter)v1.getOutputCommitter("v1Out_2");
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_1.setupCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.initCounter);
        Assert.assertEquals((long)1L, (long)v1OutputCommitter_2.setupCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitSucceeded_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitSucceeded_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.COMMITTING);
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        Assert.assertNull((Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v3", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v3", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitFail1_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, false, "testDAGCommitFail1_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        this.waitForCommitCompleted(this.dag, new DAGImpl.OutputKey("v12Out", "uv12", true));
        Assert.assertEquals((Object)DAGState.COMMITTING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v3.getState());
        Assert.assertEquals((Object)DAGTerminationCause.COMMIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitFail2_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(false, true, "testDAGCommitFail2_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v12OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v3.getState());
        Assert.assertEquals((Object)DAGTerminationCause.COMMIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitSucceeded1_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitSucceeded1_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(v3, VertexState.SUCCEEDED);
        this.waitUntil(this.dag, DAGState.COMMITTING);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitSucceeded2_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitSucceeded2_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        Thread.sleep(500L);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitSucceeded3_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlanWith2VertexGroupOutputs(true, true, true, "testDAGCommitSucceeded3_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v12OutputCommitter1 = (CountingOutputCommitter)v1.getOutputCommitter("v12Out1");
        v12OutputCommitter1.unblockCommit();
        CountingOutputCommitter v12OutputCommitter2 = (CountingOutputCommitter)v1.getOutputCommitter("v12Out2");
        v12OutputCommitter2.unblockCommit();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.SUCCEEDED);
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter1.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter1.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter1.commitCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter1.abortCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter2.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter2.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter2.commitCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter2.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)0L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitFail1_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(false, true, "testDAGCommitFail1_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v3.getState());
        Assert.assertEquals((Object)VertexTerminationCause.OTHER_VERTEX_FAILURE, (Object)v3.getTerminationCause());
        Assert.assertTrue((boolean)v3.commitFutures.isEmpty());
        Assert.assertEquals((Object)DAGTerminationCause.COMMIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitFail2_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, false, "testDAGCommitFail2_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v3.getState());
        Assert.assertEquals((Object)VertexTerminationCause.COMMIT_FAILURE, (Object)v3.getTerminationCause());
        Assert.assertTrue((boolean)v3.commitFutures.isEmpty());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitFail3_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, false, "testDAGCommitFail3_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        this.waitForCommitCompleted(this.dag, new DAGImpl.OutputKey("v12Out", "uv12", true));
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v3.getState());
        Assert.assertEquals((Object)VertexTerminationCause.COMMIT_FAILURE, (Object)v3.getTerminationCause());
        Assert.assertTrue((boolean)v3.commitFutures.isEmpty());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitFail4_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(false, true, "testDAGCommitFail4_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitForCommitCompleted(this.dag, new DAGImpl.OutputKey("v3Out", "vertex3", true));
        this.waitUntil(v3, VertexState.SUCCEEDED);
        Assert.assertTrue((boolean)v3.commitFutures.isEmpty());
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        v12OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)DAGTerminationCause.COMMIT_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGInternalErrorWhileCommiting_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.INTERNAL_ERROR));
        this.waitUntil(this.dag, DAGState.ERROR);
        Assert.assertEquals((Object)DAGTerminationCause.INTERNAL_ERROR, (Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
    }

    @Test(timeout=5000L)
    public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception {
        this._testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testServiceErrorWhileCommitting1_OnDAGSuccess() throws Exception {
        this._testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "_testDAGTerminatedWhileCommitting1_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle((DAGEvent)new DAGEventTerminateDag(this.dag.getID(), terminationCause, null));
        this.waitUntil(this.dag, terminationCause.getFinishedState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v3.getState());
        Assert.assertEquals((Object)terminationCause, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception {
        this._testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testServiceErrorWhileCommitting1_OnVertexSuccess() throws Exception {
        this._testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, true, "_testDAGTerminatedWhileCommitting1_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v3OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle((DAGEvent)new DAGEventTerminateDag(this.dag.getID(), terminationCause, null));
        this.waitUntil(this.dag, terminationCause.getFinishedState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v3.getState());
        Assert.assertEquals((Object)terminationCause.getFinishedState(), (Object)this.dag.getState());
        Assert.assertEquals((Object)terminationCause, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception {
        this._testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testServiceErrorWhileRunning_OnVertexSuccess() throws Exception {
        this._testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, true, "_testDAGKilledWhileRunning_OnVertexSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        Assert.assertEquals((Object)VertexState.COMMITTING, (Object)v3.getState());
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dag.handle((DAGEvent)new DAGEventTerminateDag(this.dag.getID(), terminationCause, null));
        this.waitUntil(this.dag, terminationCause.getFinishedState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v3.getState());
        Assert.assertEquals((Object)VertexTerminationCause.DAG_TERMINATED, (Object)v3.getTerminationCause());
        Assert.assertTrue((boolean)v3.commitFutures.isEmpty());
        Assert.assertEquals((Object)terminationCause.getFinishedState(), (Object)this.dag.getState());
        Assert.assertEquals((Object)terminationCause, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        TezTaskID newTaskId = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)1);
        v1.handle((VertexEvent)new VertexEventTaskReschedule(newTaskId));
        this.waitUntil(this.dag, DAGState.TERMINATING);
        this.waitUntil(v1, VertexState.TERMINATING);
        v1.handle((VertexEvent)new VertexEventTaskCompleted(newTaskId, TaskState.KILLED));
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, (Object)v1.getTerminationCause());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 2);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess"));
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        this.dag.handle(new DAGEvent(this.dag.getID(), DAGEventType.INTERNAL_ERROR));
        this.waitUntil(this.dag, DAGState.ERROR);
        Assert.assertEquals((Object)DAGTerminationCause.INTERNAL_ERROR, (Object)this.dag.getTerminationCause());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexGroupCommitFinishedEventFail_OnVertexSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.setupDAG(this.createDAGPlan(true, true, "testVertexGroupCommitFinishedEventFail_OnVertexSuccess"));
        this.historyEventHandler.failVertexGroupCommitFinishedEvent = true;
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        v12OutputCommitter.unblockCommit();
        this.waitUntil(this.dag, DAGState.FAILED);
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.RECOVERY_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v3.getState());
        Assert.assertEquals((Object)VertexTerminationCause.OTHER_VERTEX_FAILURE, (Object)v3.getTerminationCause());
        Assert.assertTrue((boolean)v3.commitFutures.isEmpty());
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testDAGCommitStartedEventFail_OnDAGSuccess() throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitStartedEventFail_OnDAGSuccess"));
        this.historyEventHandler.failDAGCommitStartedEvent = true;
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.FAILED);
        Assert.assertEquals((Object)DAGTerminationCause.RECOVERY_FAILURE, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 0);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    @Test(timeout=5000L)
    public void testCommitCanceled_OnDAGSuccess() throws Exception {
        this._testCommitCanceled_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
    }

    @Test(timeout=5000L)
    public void testCommitCanceled_OnDAGSuccess2() throws Exception {
        this._testCommitCanceled_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
    }

    private void _testCommitCanceled_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.setupDAG(this.createDAGPlan(true, true, "testDAGCommitStartedEventFail_OnDAGSuccess"));
        this.rawExecutor = new ControlledThreadPoolExecutor(1);
        this.execService = MoreExecutors.listeningDecorator((ExecutorService)this.rawExecutor);
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.appContext)).getExecService();
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        v1.handle((VertexEvent)new VertexEventTaskCompleted(v1.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        v3.handle((VertexEvent)new VertexEventTaskCompleted(v3.getTask(0).getTaskID(), TaskState.SUCCEEDED));
        this.waitUntil(this.dag, DAGState.COMMITTING);
        Assert.assertEquals((long)2L, (long)this.dag.commitFutures.size());
        this.dag.handle((DAGEvent)new DAGEventTerminateDag(this.dag.getID(), terminationCause, null));
        this.waitUntil(this.dag, terminationCause.getFinishedState());
        Assert.assertEquals((Object)terminationCause, (Object)this.dag.getTerminationCause());
        Assert.assertTrue((boolean)this.dag.commitFutures.isEmpty());
        this.historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
        this.historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
        this.historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
        this.historyEventHandler.verifyDAGCommitStartedEvent(this.dag.getID(), 1);
        this.historyEventHandler.verifyDAGFinishedEvent(this.dag.getID(), 1);
        CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter)v1.getOutputCommitter("v12Out");
        CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter)v3.getOutputCommitter("v3Out");
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.setupCounter);
        Assert.assertEquals((long)0L, (long)v12OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v12OutputCommitter.abortCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.initCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.setupCounter);
        Assert.assertEquals((long)0L, (long)v3OutputCommitter.commitCounter);
        Assert.assertEquals((long)1L, (long)v3OutputCommitter.abortCounter);
    }

    private static class MockHistoryEventHandler
    extends HistoryEventHandler {
        public boolean failVertexGroupCommitFinishedEvent = false;
        public boolean failDAGCommitStartedEvent = false;
        public Queue<HistoryEvent> historyEvents = new ConcurrentLinkedQueue<HistoryEvent>();

        public MockHistoryEventHandler(AppContext context) {
            super(context);
        }

        public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
            if (event.getHistoryEvent().getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED && this.failVertexGroupCommitFinishedEvent) {
                throw new IOException("fail VertexGroupCommitFinishedEvent");
            }
            if (event.getHistoryEvent().getEventType() == HistoryEventType.DAG_COMMIT_STARTED && this.failDAGCommitStartedEvent) {
                throw new IOException("fail DAGCommitStartedEvent");
            }
            this.historyEvents.add(event.getHistoryEvent());
        }

        public void verifyVertexGroupCommitStartedEvent(String groupName, int expectedTimes) {
            int actualTimes = 0;
            for (HistoryEvent event : this.historyEvents) {
                VertexGroupCommitStartedEvent startedEvent;
                if (event.getEventType() != HistoryEventType.VERTEX_GROUP_COMMIT_STARTED || !(startedEvent = (VertexGroupCommitStartedEvent)event).getVertexGroupName().equals(groupName)) continue;
                ++actualTimes;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualTimes);
        }

        public void verifyVertexGroupCommitFinishedEvent(String groupName, int expectedTimes) {
            int actualTimes = 0;
            for (HistoryEvent event : this.historyEvents) {
                VertexGroupCommitFinishedEvent finishedEvent;
                if (event.getEventType() != HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED || !(finishedEvent = (VertexGroupCommitFinishedEvent)event).getVertexGroupName().equals(groupName)) continue;
                ++actualTimes;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualTimes);
        }

        public void verifyVertexCommitStartedEvent(TezVertexID vertexId, int expectedTimes) {
            int actualTimes = 0;
            for (HistoryEvent event : this.historyEvents) {
                VertexCommitStartedEvent startedEvent;
                if (event.getEventType() != HistoryEventType.VERTEX_COMMIT_STARTED || !(startedEvent = (VertexCommitStartedEvent)event).getVertexID().equals((Object)vertexId)) continue;
                ++actualTimes;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualTimes);
        }

        public void verifyVertexFinishedEvent(TezVertexID vertexId, int expectedTimes) {
            int actualTimes = 0;
            for (HistoryEvent event : this.historyEvents) {
                VertexFinishedEvent finishedEvent;
                if (event.getEventType() != HistoryEventType.VERTEX_FINISHED || !(finishedEvent = (VertexFinishedEvent)event).getVertexID().equals((Object)vertexId)) continue;
                ++actualTimes;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualTimes);
        }

        public void verifyDAGCommitStartedEvent(TezDAGID dagId, int expectedTimes) {
            int actualTimes = 0;
            for (HistoryEvent event : this.historyEvents) {
                DAGCommitStartedEvent startedEvent;
                if (event.getEventType() != HistoryEventType.DAG_COMMIT_STARTED || !(startedEvent = (DAGCommitStartedEvent)event).getDagID().equals((Object)dagId)) continue;
                ++actualTimes;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualTimes);
        }

        public void verifyDAGFinishedEvent(TezDAGID dagId, int expectedTimes) {
            int actualTimes = 0;
            for (HistoryEvent event : this.historyEvents) {
                DAGFinishedEvent startedEvent;
                if (event.getEventType() != HistoryEventType.DAG_FINISHED || !(startedEvent = (DAGFinishedEvent)event).getDAGID().equals((Object)dagId)) continue;
                ++actualTimes;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualTimes);
        }
    }

    private class TaskEventDispatcher
    implements EventHandler<TaskEvent> {
        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent event) {
            Vertex vertex = TestCommit.this.dag.getVertex(event.getVertexID());
            Task task = vertex.getTask(event.getTaskID());
            ((EventHandler)task).handle((Event)event);
        }
    }

    private class TaskAttemptEventDispatcher
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent event) {
        }
    }

    private class VertexEventDispatcher
    implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent event) {
            Vertex vertex = TestCommit.this.dag.getVertex(event.getVertexID());
            ((EventHandler)vertex).handle((Event)event);
        }
    }

    private class DagEventDispatcher
    implements EventHandler<DAGEvent> {
        private DagEventDispatcher() {
        }

        public void handle(DAGEvent event) {
            TestCommit.this.dag.handle(event);
        }
    }

    private class DAGFinishEventHandler
    implements EventHandler<DAGAppMasterEventDAGFinished> {
        private DAGFinishEventHandler() {
        }

        public void handle(DAGAppMasterEventDAGFinished event) {
        }
    }

    public static class CountingOutputCommitter
    extends OutputCommitter {
        public volatile int initCounter = 0;
        public volatile int setupCounter = 0;
        public volatile int commitCounter = 0;
        public volatile int abortCounter = 0;
        private boolean throwError = false;
        private volatile boolean blockCommit;

        public CountingOutputCommitter(OutputCommitterContext context) {
            super(context);
        }

        public void initialize() throws IOException {
            if (this.getContext().getUserPayload() != null && this.getContext().getUserPayload().hasPayload()) {
                CountingOutputCommitterConfig conf = new CountingOutputCommitterConfig(this.getContext().getUserPayload());
                this.throwError = conf.throwError;
                this.blockCommit = conf.blockCommit;
            }
            ++this.initCounter;
        }

        public void setupOutput() throws IOException {
            ++this.setupCounter;
        }

        public void commitOutput() throws IOException {
            while (this.blockCommit) {
                try {
                    Thread.sleep(100L);
                    LOG.info((Object)("committing output:" + this.getContext().getOutputName()));
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
            ++this.commitCounter;
            if (this.throwError) {
                throw new RuntimeException("I can throwz exceptions in commit");
            }
        }

        public void unblockCommit() {
            this.blockCommit = false;
        }

        public void abortOutput(VertexStatus.State finalState) throws IOException {
            ++this.abortCounter;
        }

        public static class CountingOutputCommitterConfig
        implements Writable {
            boolean throwError = false;
            boolean blockCommit = false;

            public CountingOutputCommitterConfig() {
            }

            public CountingOutputCommitterConfig(boolean throwError, boolean blockCommit) {
                this.throwError = throwError;
                this.blockCommit = blockCommit;
            }

            public CountingOutputCommitterConfig(UserPayload payload) throws IOException {
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{payload.getPayload()});
                this.readFields((DataInput)in);
            }

            public void write(DataOutput out) throws IOException {
                out.writeBoolean(this.throwError);
                out.writeBoolean(this.blockCommit);
            }

            public void readFields(DataInput in) throws IOException {
                this.throwError = in.readBoolean();
                this.blockCommit = in.readBoolean();
            }

            public byte[] toUserPayload() throws IOException {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream out = new DataOutputStream(bos);
                this.write(out);
                return bos.toByteArray();
            }
        }
    }

    public static class FailOnVMEventReceivedlVertexManager
    extends ImmediateStartVertexManager {
        public FailOnVMEventReceivedlVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
            super.onVertexManagerEventReceived(vmEvent);
            throw new RuntimeException("fail vm");
        }
    }

    private static class ControlledThreadPoolExecutor
    extends ThreadPoolExecutor {
        public boolean startFlag = false;

        public ControlledThreadPoolExecutor(int poolSize) {
            this(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        }

        public ControlledThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            while (!this.startFlag) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            super.beforeExecute(t, r);
        }
    }
}

