package org.apache.tez.dag.app;

import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/tez/dag/app/TestMockDAGAppMaster.class */
public class TestMockDAGAppMaster {
    private static final Log LOG = LogFactory.getLog(TestMockDAGAppMaster.class);
    static Configuration defaultConf;
    static FileSystem localFs;

    /* loaded from: input_file:org/apache/tez/dag/app/TestMockDAGAppMaster$FailingOutputCommitter.class */
    public static class FailingOutputCommitter extends OutputCommitter {
        boolean failOnCommit;

        /* loaded from: input_file:org/apache/tez/dag/app/TestMockDAGAppMaster$FailingOutputCommitter$FailingOutputCommitterConfig.class */
        public static class FailingOutputCommitterConfig {
            boolean failOnCommit;

            public FailingOutputCommitterConfig() {
                this(false);
            }

            public FailingOutputCommitterConfig(boolean z) {
                this.failOnCommit = z;
            }

            public byte[] toUserPayload() {
                return Ints.toByteArray(this.failOnCommit ? 1 : 0);
            }

            public void fromUserPayload(byte[] bArr) {
                if (Ints.fromByteArray(bArr) == 0) {
                    this.failOnCommit = false;
                } else {
                    this.failOnCommit = true;
                }
            }
        }

        public FailingOutputCommitter(OutputCommitterContext outputCommitterContext) {
            super(outputCommitterContext);
            this.failOnCommit = false;
        }

        public void initialize() throws Exception {
            FailingOutputCommitterConfig failingOutputCommitterConfig = new FailingOutputCommitterConfig();
            failingOutputCommitterConfig.fromUserPayload(getContext().getUserPayload().deepCopyAsArray());
            this.failOnCommit = failingOutputCommitterConfig.failOnCommit;
        }

        public void setupOutput() throws Exception {
        }

        public void commitOutput() throws Exception {
            if (this.failOnCommit) {
                throw new Exception("fail output committer:" + getContext().getOutputName());
            }
        }

        public void abortOutput(VertexStatus.State state) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TestMockDAGAppMaster$LegacyEdgeTestEdgeManager.class */
    public static class LegacyEdgeTestEdgeManager extends EdgeManagerPlugin {
        List<Integer> destinationInputIndices;

        public LegacyEdgeTestEdgeManager(EdgeManagerPluginContext edgeManagerPluginContext) {
            super(edgeManagerPluginContext);
            this.destinationInputIndices = Collections.unmodifiableList(Collections.singletonList(0));
        }

        public void initialize() throws Exception {
        }

        public int getNumDestinationTaskPhysicalInputs(int i) throws Exception {
            return 1;
        }

        public int getNumSourceTaskPhysicalOutputs(int i) throws Exception {
            return 1;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent dataMovementEvent, int i, int i2, Map<Integer, List<Integer>> map) {
            map.put(Integer.valueOf(i), this.destinationInputIndices);
        }

        public void routeInputSourceTaskFailedEventToDestination(int i, Map<Integer, List<Integer>> map) {
            map.put(Integer.valueOf(i), this.destinationInputIndices);
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent inputReadErrorEvent, int i, int i2) {
            return i;
        }

        public int getNumDestinationConsumerTasks(int i) {
            return 1;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TestMockDAGAppMaster$TestEventsDelegate.class */
    static class TestEventsDelegate implements MockDAGAppMaster.EventsDelegate {
        @Override // org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate
        public void getEvents(TaskSpec taskSpec, List<TezEvent> list, long j) {
            for (OutputSpec outputSpec : taskSpec.getOutputs()) {
                if (outputSpec.getPhysicalEdgeCount() == 1) {
                    list.add(new TezEvent(DataMovementEvent.create(0, 0, 0, (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID()), j));
                } else {
                    list.add(new TezEvent(CompositeDataMovementEvent.create(0, outputSpec.getPhysicalEdgeCount(), (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID()), j));
                }
            }
        }
    }

    @Test(timeout = 5000)
    public void testLocalResourceSetup() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockTezClient.getLocalClient().getMockApp().getContainerLauncher();
        containerLauncher.startScheduling(false);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("LR1", LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1L, 1L));
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put("LR2", LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1L, 1L));
        DAG addTaskLocalFiles = DAG.create("testLocalResourceSetup").addTaskLocalFiles(newHashMap);
        addTaskLocalFiles.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(newHashMap2));
        DAGClient submitDAG = mockTezClient.submitDAG(addTaskLocalFiles);
        containerLauncher.waitTillContainersLaunched();
        Map localResources = containerLauncher.getContainers().values().iterator().next().launchContext.getLocalResources();
        Assert.assertTrue(localResources.containsKey("LR1"));
        Assert.assertTrue(localResources.containsKey("LR2"));
        containerLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        mockTezClient.stop();
    }

    @Test(timeout = 5000)
    public void testInternalPreemption() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        DAG create = DAG.create("testInternalPreemption");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
        create.addVertex(create2);
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        MockDAGAppMaster.MockContainerLauncher.ContainerData next = containerLauncher.getContainers().values().iterator().next();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        mockApp.getTaskSchedulerManager().preemptContainer(0, next.cId);
        containerLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        TaskAttempt attempt = currentDAG.getVertex(create2.getName()).getTask(0).getAttempt(TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(currentDAG.getID(), 0), 0), 0));
        Assert.assertTrue(attempt.getState().equals(TaskAttemptState.KILLED) || attempt.getState().equals(TaskAttemptState.FAILED));
        mockTezClient.stop();
    }

    @Test(timeout = 5000)
    public void testBasicEvents() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.eventsDelegate = new TestEventsDelegate();
        DAG create = DAG.create("testBasicEvents");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
        Vertex create4 = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2);
        Vertex create5 = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 2);
        create.addVertex(create2).addVertex(create3).addVertex(create4).addVertex(create5).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))).addEdge(Edge.create(create2, create4, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))).addEdge(Edge.create(create2, create5, EdgeProperty.create(EdgeProperty.DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        VertexImpl vertex = currentDAG.getVertex(create3.getName());
        List events = vertex.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(vertex.getTask(1).getTaskId(), 0), 0, 0, 1000).getEvents();
        Assert.assertEquals(2L, events.size());
        Assert.assertEquals(create2.getName(), ((TezEvent) events.get(0)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals(0L, ((TezEvent) events.get(0)).getEvent().getSourceIndex());
        Assert.assertEquals(create2.getName(), ((TezEvent) events.get(1)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals(0L, ((TezEvent) events.get(1)).getEvent().getSourceIndex());
        int targetIndex = ((TezEvent) events.get(0)).getEvent().getTargetIndex();
        int targetIndex2 = ((TezEvent) events.get(1)).getEvent().getTargetIndex();
        Assert.assertTrue("t1: " + targetIndex + " t2: " + targetIndex2, (targetIndex == 0 && targetIndex2 == 1) || (targetIndex == 1 && targetIndex2 == 0));
        VertexImpl vertex2 = currentDAG.getVertex(create4.getName());
        List events2 = vertex2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(vertex2.getTask(1).getTaskId(), 0), 0, 0, 1000).getEvents();
        Assert.assertEquals(2L, events2.size());
        Assert.assertEquals(create2.getName(), ((TezEvent) events2.get(0)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals(1L, ((TezEvent) events2.get(0)).getEvent().getSourceIndex());
        Assert.assertEquals(create2.getName(), ((TezEvent) events2.get(1)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals(1L, ((TezEvent) events2.get(1)).getEvent().getSourceIndex());
        int targetIndex3 = ((TezEvent) events2.get(0)).getEvent().getTargetIndex();
        int targetIndex4 = ((TezEvent) events2.get(1)).getEvent().getTargetIndex();
        Assert.assertTrue("t1: " + targetIndex3 + " t2: " + targetIndex4, (targetIndex3 == 0 && targetIndex4 == 1) || (targetIndex3 == 1 && targetIndex4 == 0));
        VertexImpl vertex3 = currentDAG.getVertex(create5.getName());
        List events3 = vertex3.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(vertex3.getTask(1).getTaskId(), 0), 0, 0, 1000).getEvents();
        Assert.assertEquals(1L, events3.size());
        Assert.assertEquals(create2.getName(), ((TezEvent) events3.get(0)).getDestinationInfo().getEdgeVertexName());
        Assert.assertEquals(0L, ((TezEvent) events3.get(0)).getEvent().getTargetIndex());
        Assert.assertEquals(0L, ((TezEvent) events3.get(0)).getEvent().getSourceIndex());
        mockTezClient.stop();
    }

    @Test(timeout = 100000)
    public void testMixedEdgeRouting() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.eventsDelegate = new TestEventsDelegate();
        DAG create = DAG.create("testMixedEdgeRouting");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 1);
        Vertex create4 = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 1);
        Vertex create5 = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 1);
        Vertex create6 = Vertex.create("E", ProcessorDescriptor.create("Proc.class"), 1);
        create.addVertex(create2).addVertex(create3).addVertex(create4).addVertex(create5).addVertex(create6).addEdge(Edge.create(create2, create4, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))).addEdge(Edge.create(create3, create4, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))).addEdge(Edge.create(create2, create5, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))).addEdge(Edge.create(create3, create5, EdgeProperty.create(EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()), EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))).addEdge(Edge.create(create3, create6, EdgeProperty.create(EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()), EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        TaskImpl task = currentDAG.getVertex(create4.getName()).getTask(0);
        Assert.assertEquals(0L, task.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(task.getTaskId(), 0), 0, 1000).size());
        TaskImpl task2 = currentDAG.getVertex(create5.getName()).getTask(0);
        Assert.assertEquals(1L, task2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(task2.getTaskId(), 0), 0, 1000).size());
        TaskImpl task3 = currentDAG.getVertex(create6.getName()).getTask(0);
        Assert.assertEquals(1L, task3.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(task3.getTaskId(), 0), 0, 1000).size());
        mockTezClient.stop();
    }

    @Test(timeout = 100000)
    public void testConcurrencyLimit() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false, 20, 1000);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        mockApp.containerDelegate = new MockDAGAppMaster.ContainerDelegate() { // from class: org.apache.tez.dag.app.TestMockDAGAppMaster.1
            @Override // org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate
            public void stop(ContainerStopRequest containerStopRequest) {
                atomicInteger.decrementAndGet();
            }

            @Override // org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate
            public void launch(ContainerLaunchRequest containerLaunchRequest) {
                int incrementAndGet = atomicInteger.incrementAndGet();
                if (incrementAndGet > 5) {
                    atomicBoolean.set(true);
                }
                System.out.println("Launched: " + incrementAndGet);
            }
        };
        DAG create = DAG.create("testConcurrencyLimit");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 20).setConf("tez.am.vertex.max-task-concurrency", String.valueOf(5)));
        containerLauncher.startScheduling(true);
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        Assert.assertFalse(atomicBoolean.get());
        mockTezClient.stop();
    }

    @Test
    public void testCountersAggregation() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false);
        mockTezClient.start();
        DAG create = DAG.create("testCountersAggregation");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 1);
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        TezCounters tezCounters = new TezCounters();
        tezCounters.findCounter(new String("Global"), new String("Global")).increment(1L);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        tezCounters.write(new DataOutputStream(byteArrayOutputStream));
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.countersDelegate = new MockDAGAppMaster.CountersDelegate() { // from class: org.apache.tez.dag.app.TestMockDAGAppMaster.2
            int counterValue = 0;

            @Override // org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate
            public TezCounters getCounters(TaskSpec taskSpec) {
                String vertexName = taskSpec.getVertexName();
                TezCounters tezCounters2 = new TezCounters();
                DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
                dataInputByteBuffer.reset(new ByteBuffer[]{ByteBuffer.wrap(byteArray)});
                try {
                    tezCounters2.readFields(dataInputByteBuffer);
                } catch (IOException e) {
                    Assert.fail(e.getMessage());
                }
                TezCounter findCounter = tezCounters2.findCounter(vertexName, "Proc");
                int i = this.counterValue + 1;
                this.counterValue = i;
                findCounter.setValue(i);
                Iterator it = taskSpec.getOutputs().iterator();
                while (it.hasNext()) {
                    TezCounter findCounter2 = tezCounters2.findCounter(vertexName, ((OutputSpec) it.next()).getDestinationVertexName());
                    int i2 = this.counterValue + 1;
                    this.counterValue = i2;
                    findCounter2.setValue(i2);
                }
                Iterator it2 = taskSpec.getInputs().iterator();
                while (it2.hasNext()) {
                    TezCounter findCounter3 = tezCounters2.findCounter(vertexName, ((InputSpec) it2.next()).getSourceVertexName());
                    int i3 = this.counterValue + 1;
                    this.counterValue = i3;
                    findCounter3.setValue(i3);
                }
                return tezCounters2;
            }
        };
        mockApp.doSleep = false;
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.waitForCompletion().getState());
        currentDAG.getAllCounters();
        VertexImpl vertex = currentDAG.getVertex("A");
        VertexImpl vertex2 = currentDAG.getVertex("B");
        TezCounters allCounters = vertex.getAllCounters();
        TezCounters allCounters2 = vertex2.getAllCounters();
        Assert.assertEquals(19L, allCounters.findCounter("A", "Proc").getMax());
        Assert.assertEquals(1L, allCounters.findCounter("A", "Proc").getMin());
        Assert.assertEquals(20L, allCounters.findCounter("A", "B").getMax());
        Assert.assertEquals(2L, allCounters.findCounter("A", "B").getMin());
        Assert.assertEquals(21L, allCounters2.findCounter("B", "Proc").getMin());
        Assert.assertEquals(21L, allCounters2.findCounter("B", "Proc").getMax());
        Assert.assertEquals(22L, allCounters2.findCounter("B", "A").getMin());
        Assert.assertEquals(22L, allCounters2.findCounter("B", "A").getMax());
        mockTezClient.stop();
    }

    @Test(timeout = 10000)
    public void testBasicCounters() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false);
        mockTezClient.start();
        DAG create = DAG.create("testBasicCounters");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 1);
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        TezCounters tezCounters = new TezCounters();
        tezCounters.findCounter(new String("Global"), new String("Global")).increment(1L);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        tezCounters.write(new DataOutputStream(byteArrayOutputStream));
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.countersDelegate = new MockDAGAppMaster.CountersDelegate() { // from class: org.apache.tez.dag.app.TestMockDAGAppMaster.3
            @Override // org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate
            public TezCounters getCounters(TaskSpec taskSpec) {
                String vertexName = taskSpec.getVertexName();
                TezCounters tezCounters2 = new TezCounters();
                DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
                dataInputByteBuffer.reset(new ByteBuffer[]{ByteBuffer.wrap(byteArray)});
                try {
                    tezCounters2.readFields(dataInputByteBuffer);
                } catch (IOException e) {
                    Assert.fail(e.getMessage());
                }
                tezCounters2.findCounter(vertexName, "Proc").increment(1L);
                Iterator it = taskSpec.getOutputs().iterator();
                while (it.hasNext()) {
                    tezCounters2.findCounter(vertexName, ((OutputSpec) it.next()).getDestinationVertexName()).increment(1L);
                }
                Iterator it2 = taskSpec.getInputs().iterator();
                while (it2.hasNext()) {
                    tezCounters2.findCounter(vertexName, ((InputSpec) it2.next()).getSourceVertexName()).increment(1L);
                }
                return tezCounters2;
            }
        };
        mockApp.doSleep = false;
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.waitForCompletion().getState());
        TezCounters allCounters = currentDAG.getAllCounters();
        System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
        if (SystemUtils.IS_OS_LINUX) {
            Assert.assertTrue(allCounters.findCounter(DAGCounter.AM_CPU_MILLISECONDS).getValue() > 0);
        }
        Assert.assertEquals(10L, allCounters.findCounter("A", "Proc").getValue());
        Assert.assertEquals(1L, allCounters.findCounter("B", "Proc").getValue());
        Assert.assertEquals(10L, allCounters.findCounter("A", "B").getValue());
        Assert.assertEquals(1L, allCounters.findCounter("B", "A").getValue());
        Assert.assertEquals(11L, allCounters.findCounter("Global", "Global").getValue());
        VertexImpl vertex = currentDAG.getVertex("A");
        VertexImpl vertex2 = currentDAG.getVertex("B");
        TezCounters allCounters2 = vertex.getAllCounters();
        TezCounters allCounters3 = vertex2.getAllCounters();
        if (allCounters2.findCounter("Global", "Global").getName() != allCounters3.findCounter("Global", "Global").getName()) {
            Assert.fail("String counter name objects dont match despite interning.");
        }
        if (allCounters2.getGroup("Global").getName() != allCounters3.getGroup("Global").getName()) {
            Assert.fail("String group name objects dont match despite interning.");
        }
        mockTezClient.stop();
    }

    @Test(timeout = 10000)
    public void testBasicStatistics() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false);
        mockTezClient.start();
        DAG create = DAG.create("testBasisStatistics");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 3);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
        create2.addDataSource("In", DataSourceDescriptor.create(InputDescriptor.create("In"), (InputInitializerDescriptor) null, (Credentials) null));
        create3.addDataSink("Out", DataSinkDescriptor.create(OutputDescriptor.create("Out"), (OutputCommitterDescriptor) null, (Credentials) null));
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        IOStatistics iOStatistics = new IOStatistics();
        iOStatistics.setDataSize(1L);
        iOStatistics.setItemsProcessed(1L);
        TaskStatistics taskStatistics = new TaskStatistics();
        taskStatistics.addIO("B", iOStatistics);
        taskStatistics.addIO("In", iOStatistics);
        TaskStatistics taskStatistics2 = new TaskStatistics();
        taskStatistics2.addIO("A", iOStatistics);
        taskStatistics2.addIO("Out", iOStatistics);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        taskStatistics.write(new DataOutputStream(byteArrayOutputStream));
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        taskStatistics2.write(new DataOutputStream(byteArrayOutputStream2));
        final byte[] byteArray2 = byteArrayOutputStream2.toByteArray();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.statsDelegate = new MockDAGAppMaster.StatisticsDelegate() { // from class: org.apache.tez.dag.app.TestMockDAGAppMaster.4
            @Override // org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate
            public TaskStatistics getStatistics(TaskSpec taskSpec) {
                byte[] bArr = byteArray;
                TaskStatistics taskStatistics3 = new TaskStatistics();
                if (taskSpec.getVertexName().equals("B")) {
                    bArr = byteArray2;
                }
                DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
                dataInputByteBuffer.reset(new ByteBuffer[]{ByteBuffer.wrap(bArr)});
                try {
                    taskStatistics3.readFields(dataInputByteBuffer);
                } catch (IOException e) {
                    Assert.fail(e.getMessage());
                }
                return taskStatistics3;
            }
        };
        mockApp.doSleep = false;
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.waitForCompletion().getState());
        for (org.apache.tez.dag.app.dag.Vertex vertex : currentDAG.getVertices().values()) {
            VertexStatistics statistics = vertex.getStatistics();
            if (vertex.getName().equals("A")) {
                Assert.assertEquals(3L, statistics.getOutputStatistics("B").getDataSize());
                Assert.assertEquals(3L, statistics.getInputStatistics("In").getDataSize());
                Assert.assertEquals(3L, statistics.getOutputStatistics("B").getItemsProcessed());
                Assert.assertEquals(3L, statistics.getInputStatistics("In").getItemsProcessed());
            } else {
                Assert.assertEquals(2L, statistics.getInputStatistics("A").getDataSize());
                Assert.assertEquals(2L, statistics.getOutputStatistics("Out").getDataSize());
                Assert.assertEquals(2L, statistics.getInputStatistics("A").getItemsProcessed());
                Assert.assertEquals(2L, statistics.getOutputStatistics("Out").getItemsProcessed());
            }
        }
        mockTezClient.stop();
    }

    private void checkMemory(String str, MockDAGAppMaster mockDAGAppMaster) {
        Runtime runtime = Runtime.getRuntime();
        System.out.println("##### Heap utilization statistics [MB] for " + str);
        runtime.gc();
        System.out.println("##### Used Memory:" + ((runtime.totalMemory() - runtime.freeMemory()) / 1048576));
        System.out.println("##### Free Memory:" + (runtime.freeMemory() / 1048576));
        System.out.println("##### Total Memory:" + (runtime.totalMemory() / 1048576));
        System.out.println("##### Max Memory:" + (runtime.maxMemory() / 1048576));
    }

    @Test(timeout = 60000)
    @Ignore
    public void testBasicCounterMemory() throws Exception {
        Logger.getRootLogger().setLevel(Level.WARN);
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false);
        mockTezClient.start();
        DAG create = DAG.create("testBasicCounterMemory");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10000));
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.countersDelegate = new MockDAGAppMaster.CountersDelegate() { // from class: org.apache.tez.dag.app.TestMockDAGAppMaster.5
            @Override // org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate
            public TezCounters getCounters(TaskSpec taskSpec) {
                TezCounters tezCounters = new TezCounters();
                for (int i = 0; i < 6; i++) {
                    for (int i2 = 0; i2 < 15; i2++) {
                        tezCounters.findCounter(i + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz", i + "abcdefghijklmnopqrstuvwxyz").increment(1L);
                    }
                }
                return tezCounters;
            }
        };
        mockApp.doSleep = false;
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.waitForCompletion().getState());
        Assert.assertNotNull(currentDAG.getAllCounters());
        checkMemory(create.getName(), mockApp);
        mockTezClient.stop();
    }

    @Test(timeout = 60000)
    @Ignore
    public void testTaskEventsProcessingSpeed() throws Exception {
        Logger.getRootLogger().setLevel(Level.WARN);
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.am.use.concurrent-dispatcher", true);
        MockTezClient mockTezClient = new MockTezClient("testMockAM", tezConfiguration, true, null, null, null, null, false, false, 30, 1000);
        mockTezClient.start();
        DAG create = DAG.create("testTaskEventsProcessingSpeed");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 50000));
        mockTezClient.getLocalClient().getMockApp().doSleep = false;
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, mockTezClient.submitDAG(create).waitForCompletion().getState());
        mockTezClient.stop();
    }

    @Test(timeout = 60000)
    @Ignore
    public void testBasicStatisticsMemory() throws Exception {
        Logger.getRootLogger().setLevel(Level.WARN);
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false);
        mockTezClient.start();
        IOStatistics iOStatistics = new IOStatistics();
        iOStatistics.setDataSize(1L);
        iOStatistics.setItemsProcessed(1L);
        TaskStatistics taskStatistics = new TaskStatistics();
        DAG create = DAG.create("testBasicStatisticsMemory");
        Vertex create2 = Vertex.create("abcdefghijklmnopqrstuvwxyz", ProcessorDescriptor.create("Proc.class"), 10000);
        for (int i = 0; i < 10; i++) {
            String str = i + "abcdefghijklmnopqrstuvwxyz";
            create2.addDataSource(str, DataSourceDescriptor.create(InputDescriptor.create(str), (InputInitializerDescriptor) null, (Credentials) null));
            taskStatistics.addIO(str, iOStatistics);
        }
        create.addVertex(create2);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        taskStatistics.write(new DataOutputStream(byteArrayOutputStream));
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.statsDelegate = new MockDAGAppMaster.StatisticsDelegate() { // from class: org.apache.tez.dag.app.TestMockDAGAppMaster.6
            @Override // org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate
            public TaskStatistics getStatistics(TaskSpec taskSpec) {
                byte[] bArr = byteArray;
                TaskStatistics taskStatistics2 = new TaskStatistics();
                DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
                dataInputByteBuffer.reset(new ByteBuffer[]{ByteBuffer.wrap(bArr)});
                try {
                    taskStatistics2.readFields(dataInputByteBuffer);
                } catch (IOException e) {
                    Assert.fail(e.getMessage());
                }
                return taskStatistics2;
            }
        };
        mockApp.doSleep = false;
        DAGClient submitDAG = mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        DAGImpl currentDAG = mockApp.getContext().getCurrentDAG();
        containerLauncher.startScheduling(true);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.waitForCompletion().getState());
        Assert.assertEquals(10000, currentDAG.getVertex("abcdefghijklmnopqrstuvwxyz").getStatistics().getInputStatistics("0abcdefghijklmnopqrstuvwxyz").getDataSize());
        Assert.assertEquals(10000, currentDAG.getVertex("abcdefghijklmnopqrstuvwxyz").getStatistics().getInputStatistics("0abcdefghijklmnopqrstuvwxyz").getItemsProcessed());
        checkMemory(create.getName(), mockApp);
        mockTezClient.stop();
    }

    @Test(timeout = 10000)
    public void testMultipleSubmissions() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("LR1", LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1L, 1L));
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put("LR2", LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1L, 1L));
        DAG addTaskLocalFiles = DAG.create("test").addTaskLocalFiles(newHashMap);
        addTaskLocalFiles.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(newHashMap2));
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        MockTezClient mockTezClient = new MockTezClient("testMockAM", tezConfiguration, true, null, null, null, null);
        mockTezClient.start();
        DAGClient submitDAG = mockTezClient.submitDAG(addTaskLocalFiles);
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        mockTezClient.stop();
        MockTezClient mockTezClient2 = new MockTezClient("testMockAM", tezConfiguration, true, null, null, null, null);
        mockTezClient2.start();
        DAGClient submitDAG2 = mockTezClient2.submitDAG(addTaskLocalFiles);
        submitDAG2.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG2.getDAGStatus((Set) null).getState());
        mockTezClient2.stop();
    }

    @Test(timeout = 10000)
    public void testSchedulerErrorHandling() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        DAG create = DAG.create("testSchedulerErrorHandling");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5));
        mockTezClient.submitDAG(create);
        containerLauncher.waitTillContainersLaunched();
        mockApp.handle(new DAGAppMasterEventSchedulingServiceError(StringUtils.stringifyException(new RuntimeException("Mock error"))));
        while (!mockApp.getShutdownHandler().wasShutdownInvoked()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(DAGState.RUNNING, mockApp.getContext().getCurrentDAG().getState());
    }

    @Test(timeout = 10000)
    public void testInitFailed() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, new AtomicBoolean(false), true, false);
        try {
            mockTezClient.start();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.assertEquals("FailInit", e.getCause().getCause().getMessage());
            mockTezClient.getLocalClient().getMockApp().waitForServiceToStop(2147483647L);
        }
    }

    @Test(timeout = 10000)
    public void testStartFailed() {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, new AtomicBoolean(false), false, true);
        try {
            mockTezClient.start();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.assertEquals("FailStart", e.getCause().getCause().getMessage());
            mockTezClient.getLocalClient().getMockApp().waitForServiceToStop(2147483647L);
        }
    }

    private OutputCommitterDescriptor createOutputCommitterDesc(boolean z) {
        OutputCommitterDescriptor create = OutputCommitterDescriptor.create(FailingOutputCommitter.class.getName());
        create.setUserPayload(UserPayload.create(ByteBuffer.wrap(new FailingOutputCommitter.FailingOutputCommitterConfig(z).toUserPayload())));
        return create;
    }

    private DAG createDAG(String str, boolean z, boolean z2) {
        DAG create = DAG.create(str);
        Vertex create2 = Vertex.create("v1", ProcessorDescriptor.create("Proc"), 1);
        Vertex create3 = Vertex.create("v2", ProcessorDescriptor.create("Proc"), 1);
        Vertex create4 = Vertex.create("v3", ProcessorDescriptor.create("Proc"), 1);
        VertexGroup createVertexGroup = create.createVertexGroup("uv12", new Vertex[]{create2, create3});
        createVertexGroup.addDataSink("uv12Out", DataSinkDescriptor.create(OutputDescriptor.create("dummy output"), createOutputCommitterDesc(z), (Credentials) null));
        create4.addDataSink("v3Out", DataSinkDescriptor.create(OutputDescriptor.create("dummy output"), createOutputCommitterDesc(z2), (Credentials) null));
        create.addVertex(create2).addVertex(create3).addVertex(create4).addEdge(GroupInputEdge.create(createVertexGroup, create4, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class")), InputDescriptor.create("merge.class")));
        return create;
    }

    @Test(timeout = 60000)
    public void testCommitOutputOnDAGSuccess() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        DAGClient submitDAG = mockTezClient.submitDAG(createDAG("testDAGBothCommitsSucceed", false, false));
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        DAGClient submitDAG2 = mockTezClient.submitDAG(createDAG("testDAGVertexGroupCommitFail", true, false));
        submitDAG2.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG2.getDAGStatus((Set) null).getState());
        LOG.info(submitDAG2.getDAGStatus((Set) null).getDiagnostics());
        Assert.assertTrue(org.apache.commons.lang.StringUtils.join(submitDAG2.getDAGStatus((Set) null).getDiagnostics(), ",").contains("fail output committer:uv12Out"));
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG2.getVertexStatus("v1", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG2.getVertexStatus("v2", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG2.getVertexStatus("v3", (Set) null).getState());
        DAGClient submitDAG3 = mockTezClient.submitDAG(createDAG("testDAGVertexCommitFail", false, true));
        submitDAG3.waitForCompletion();
        LOG.info(submitDAG3.getDAGStatus((Set) null).getDiagnostics());
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG3.getDAGStatus((Set) null).getState());
        Assert.assertTrue(org.apache.commons.lang.StringUtils.join(submitDAG3.getDAGStatus((Set) null).getDiagnostics(), ",").contains("fail output committer:v3Out"));
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG3.getVertexStatus("v1", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG3.getVertexStatus("v2", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG3.getVertexStatus("v3", (Set) null).getState());
        DAGClient submitDAG4 = mockTezClient.submitDAG(createDAG("testDAGBothCommitsFail", true, true));
        submitDAG4.waitForCompletion();
        LOG.info(submitDAG4.getDAGStatus((Set) null).getDiagnostics());
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG4.getDAGStatus((Set) null).getState());
        String join = org.apache.commons.lang.StringUtils.join(submitDAG4.getDAGStatus((Set) null).getDiagnostics(), ",");
        Assert.assertTrue(join.contains("fail output committer:uv12Out") || join.contains("fail output committer:v3Out"));
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG4.getVertexStatus("v1", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG4.getVertexStatus("v2", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG4.getVertexStatus("v3", (Set) null).getState());
        mockTezClient.stop();
    }

    @Test(timeout = 60000)
    public void testCommitOutputOnVertexSuccess() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        MockTezClient mockTezClient = new MockTezClient("testMockAM", tezConfiguration, true, null, null, null, null);
        mockTezClient.start();
        DAGClient submitDAG = mockTezClient.submitDAG(createDAG("testDAGBothCommitsSucceed", false, false));
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        DAGClient submitDAG2 = mockTezClient.submitDAG(createDAG("testDAGVertexGroupCommitFail", true, false));
        submitDAG2.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG2.getDAGStatus((Set) null).getState());
        LOG.info(submitDAG2.getDAGStatus((Set) null).getDiagnostics());
        Assert.assertTrue(org.apache.commons.lang.StringUtils.join(submitDAG2.getDAGStatus((Set) null).getDiagnostics(), ",").contains("fail output committer:uv12Out"));
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG2.getVertexStatus("v1", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG2.getVertexStatus("v2", (Set) null).getState());
        VertexStatus.State state = submitDAG2.getVertexStatus("v3", (Set) null).getState();
        if (state.equals(VertexStatus.State.SUCCEEDED)) {
            LOG.info("v3 is succeeded");
        } else {
            Assert.assertEquals(VertexStatus.State.KILLED, state);
        }
        DAGClient submitDAG3 = mockTezClient.submitDAG(createDAG("testDAGVertexCommitFail", false, true));
        submitDAG3.waitForCompletion();
        LOG.info(submitDAG3.getDAGStatus((Set) null).getDiagnostics());
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG3.getDAGStatus((Set) null).getState());
        Assert.assertTrue(org.apache.commons.lang.StringUtils.join(submitDAG3.getDAGStatus((Set) null).getDiagnostics(), ",").contains("Commit failed"));
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG3.getVertexStatus("v1", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG3.getVertexStatus("v2", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.FAILED, submitDAG3.getVertexStatus("v3", (Set) null).getState());
        Assert.assertTrue(org.apache.commons.lang.StringUtils.join(submitDAG3.getVertexStatus("v3", (Set) null).getDiagnostics(), ",").contains("fail output committer:v3Out"));
        DAGClient submitDAG4 = mockTezClient.submitDAG(createDAG("testDAGBothCommitsFail", true, true));
        submitDAG4.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG4.getDAGStatus((Set) null).getState());
        LOG.info(submitDAG4.getDAGStatus((Set) null).getDiagnostics());
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG4.getDAGStatus((Set) null).getState());
        String join = org.apache.commons.lang.StringUtils.join(submitDAG4.getDAGStatus((Set) null).getDiagnostics(), ",");
        Assert.assertTrue(join.contains("fail output committer:uv12Out") || join.contains("fail output committer:v3Out"));
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG4.getVertexStatus("v1", (Set) null).getState());
        Assert.assertEquals(VertexStatus.State.SUCCEEDED, submitDAG4.getVertexStatus("v2", (Set) null).getState());
        VertexStatus.State state2 = submitDAG4.getVertexStatus("v3", (Set) null).getState();
        if (state2.equals(VertexStatus.State.FAILED)) {
            LOG.info("v3 is failed");
            Assert.assertTrue(org.apache.commons.lang.StringUtils.join(submitDAG4.getVertexStatus("v3", (Set) null).getDiagnostics(), ",").contains("fail output committer:v3Out"));
        } else {
            Assert.assertEquals(VertexStatus.State.KILLED, state2);
        }
        mockTezClient.stop();
    }

    @Test(timeout = 5000)
    public void testDAGFinishedRecoveryError() throws Exception {
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        mockApp.recoveryFatalError = true;
        mockApp.getContainerLauncher().startScheduling(true);
        DAG create = DAG.create("test");
        create.addVertex(Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5));
        mockTezClient.submitDAG(create).waitForCompletion();
        while (!mockApp.getShutdownHandler().wasShutdownInvoked()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(DAGState.SUCCEEDED, mockApp.getContext().getCurrentDAG().getState());
        Assert.assertEquals(DAGAppMasterState.FAILED, mockApp.getState());
        Assert.assertTrue(org.apache.commons.lang.StringUtils.join(mockApp.getDiagnostics(), ",").contains("Recovery had a fatal error, shutting down session after DAG completion"));
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            localFs = FileSystem.getLocal(defaultConf);
            defaultConf.set("tez.staging-dir", "target/" + TestMockDAGAppMaster.class.getName() + "-tmpDir");
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
