package org.apache.tez.history;

import com.google.common.collect.Sets;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.examples.WordCount;
import org.apache.tez.history.parser.ATSFileParser;
import org.apache.tez.history.parser.SimpleHistoryParser;
import org.apache.tez.history.parser.datamodel.BaseInfo;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.EdgeInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VersionInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.tests.MiniTezClusterWithTimeline;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/tez/history/TestHistoryParser.class */
public class TestHistoryParser {
    private static MiniDFSCluster miniDFSCluster;
    private static MiniTezClusterWithTimeline miniTezCluster;
    private static final String INPUT = "Input";
    private static final String OUTPUT = "Output";
    private static final String TOKENIZER = "Tokenizer";
    private static final String SUMMATION = "Summation";
    private static final String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
    private static final String HISTORY_TXT = "history.txt";
    private static FileSystem fs;
    private static String yarnTimelineAddress;
    private static Path inputLoc = new Path("/tmp/sample.txt");
    private static Configuration conf = new Configuration();
    private static String TEST_ROOT_DIR = "target/" + TestHistoryParser.class.getName() + "-tmpDir";
    private static String TEZ_BASE_DIR = "target/" + TestHistoryParser.class.getName() + "-tez";
    private static String DOWNLOAD_DIR = TEST_ROOT_DIR + "/download";

    /* loaded from: input_file:org/apache/tez/history/TestHistoryParser$FailProcessor.class */
    public static class FailProcessor extends SimpleMRProcessor {
        public FailProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void run() throws Exception {
            throw new Exception("Failing this processor for some reason");
        }
    }

    @BeforeClass
    public static void setupCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.edits.noeditlogchannelflush", false);
        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
        conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
        miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        fs = miniDFSCluster.getFileSystem();
        conf.set("fs.defaultFS", fs.getUri().toString());
        setupTezCluster();
    }

    @AfterClass
    public static void shutdownCluster() {
        try {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            if (miniTezCluster != null) {
                miniTezCluster.stop();
            }
            try {
                FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
                FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
            } catch (IOException e) {
            }
        } catch (Throwable th) {
            try {
                FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
                FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
            } catch (IOException e2) {
            }
            throw th;
        }
    }

    public static void setupTezCluster() throws Exception {
        conf.setInt("tez.runtime.shuffle.connect.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.read.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.fetch.failures.limit", 2);
        conf.setBoolean("tez.task.generate.counters.per.io", true);
        conf.setBoolean("tez.allow.disabled.timeline-domains", true);
        conf.setBoolean("yarn.timeline-service.enabled", true);
        conf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        conf.set("tez.simple.history.logging.dir", SIMPLE_HISTORY_DIR);
        miniTezCluster = new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
        miniTezCluster.init(conf);
        miniTezCluster.start();
        createSampleFile(inputLoc);
        TezConfiguration tezConfiguration = new TezConfiguration(miniTezCluster.getConfig());
        tezConfiguration.setBoolean("yarn.timeline-service.enabled", true);
        tezConfiguration.set("yarn.timeline-service.webapp.address", miniTezCluster.getConfig().get("yarn.timeline-service.webapp.address"));
        tezConfiguration.setBoolean("tez.allow.disabled.timeline-domains", true);
        tezConfiguration.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        yarnTimelineAddress = miniTezCluster.getConfig().get("yarn.timeline-service.webapp.address");
    }

    @Test
    public void testParserWithSuccessfulJob() throws Exception {
        String runWordCount = runWordCount(WordCount.TokenProcessor.class.getName(), WordCount.SumProcessor.class.getName(), "WordCount", true);
        Assert.assertTrue(ATSImportTool.process(new String[]{new StringBuilder().append("--dagId=").append(runWordCount).toString(), new StringBuilder().append("--downloadDir=").append(DOWNLOAD_DIR).toString(), new StringBuilder().append("--yarnTimelineAddress=").append(yarnTimelineAddress).toString()}) == 0);
        DagInfo dagInfo = getDagInfo(runWordCount);
        verifyDagInfo(dagInfo, true);
        verifyJobSpecificInfo(dagInfo);
        checkConfig(dagInfo);
        String runWordCount2 = runWordCount(WordCount.TokenProcessor.class.getName(), WordCount.SumProcessor.class.getName(), "WordCount", false);
        Thread.sleep(10000L);
        DagInfo dagInfoFromSimpleHistory = getDagInfoFromSimpleHistory(runWordCount2);
        verifyDagInfo(dagInfoFromSimpleHistory, false);
        verifyJobSpecificInfo(dagInfoFromSimpleHistory);
        isDAGEqual(dagInfo, dagInfoFromSimpleHistory);
    }

    private DagInfo getDagInfoFromSimpleHistory(String str) throws TezException, IOException {
        Path path = new Path(conf.get("fs.defaultFS") + SIMPLE_HISTORY_DIR + HISTORY_TXT + "." + ApplicationAttemptId.newInstance(TezDAGID.fromString(str).getApplicationId(), 1));
        path.getFileSystem(conf).copyToLocalFile(path, new Path(DOWNLOAD_DIR, HISTORY_TXT));
        DagInfo dAGData = new SimpleHistoryParser(Arrays.asList(new File(DOWNLOAD_DIR, HISTORY_TXT))).getDAGData(str);
        Assert.assertTrue(dAGData.getDagId().equals(str));
        return dAGData;
    }

    private void checkConfig(DagInfo dagInfo) {
        Assert.assertTrue("DagInfo is " + dagInfo, dagInfo != null);
        Assert.assertTrue("DagInfo config size=" + dagInfo.getAppConfig().size(), dagInfo.getAppConfig().size() > 0);
        Assert.assertTrue("DagInfo config=" + dagInfo.getAppConfig(), Integer.parseInt((String) dagInfo.getAppConfig().get("dfs.replication")) > 0);
    }

    private void verifyJobSpecificInfo(DagInfo dagInfo) {
        Assert.assertTrue(dagInfo.getNumVertices() == 2);
        Assert.assertTrue(dagInfo.getName().equals("WordCount"));
        Assert.assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(WordCount.TokenProcessor.class.getName()));
        Assert.assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName().equals(WordCount.SumProcessor.class.getName()));
        Assert.assertTrue(dagInfo.getFinishTime() > dagInfo.getStartTime());
        Assert.assertTrue(dagInfo.getEdges().size() == 1);
        EdgeInfo edgeInfo = (EdgeInfo) dagInfo.getEdges().iterator().next();
        Assert.assertTrue(edgeInfo.getDataMovementType().equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
        Assert.assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
        Assert.assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
        Assert.assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
        Assert.assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
        Assert.assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
        Assert.assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
        Assert.assertTrue(dagInfo.getVertices().size() == 2);
        String str = null;
        String str2 = null;
        for (VertexInfo vertexInfo : dagInfo.getVertices()) {
            Assert.assertTrue(vertexInfo.getKilledTasksCount() == 0);
            Assert.assertTrue(vertexInfo.getInitRequestedTime() > 0);
            Assert.assertTrue(vertexInfo.getInitTime() > 0);
            Assert.assertTrue(vertexInfo.getStartRequestedTime() > 0);
            Assert.assertTrue(vertexInfo.getStartTime() > 0);
            Assert.assertTrue(vertexInfo.getFinishTime() > 0);
            Assert.assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime());
            long j = 0;
            for (TaskInfo taskInfo : vertexInfo.getTasks()) {
                Assert.assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
                Assert.assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
                Assert.assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
                Assert.assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0.0f);
                Assert.assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
                Assert.assertTrue(taskInfo.getContainersMapping().size() > 0);
                Assert.assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
                Assert.assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
                Assert.assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
                Assert.assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime());
                List taskAttempts = taskInfo.getTaskAttempts();
                if (!vertexInfo.getVertexName().equals(TOKENIZER)) {
                    Iterator it = taskAttempts.iterator();
                    while (it.hasNext()) {
                        TaskAttemptInfo.DataDependencyEvent dataDependencyEvent = (TaskAttemptInfo.DataDependencyEvent) ((TaskAttemptInfo) it.next()).getLastDataEvents().get(0);
                        Assert.assertTrue(dataDependencyEvent.getTimestamp() > 0);
                        if (str2 == null) {
                            str2 = dataDependencyEvent.getTaskAttemptId();
                        } else {
                            Assert.assertTrue(str2.equals(dataDependencyEvent.getTaskAttemptId()));
                        }
                    }
                } else if (j < taskInfo.getFinishTime()) {
                    j = taskInfo.getFinishTime();
                    str = taskInfo.getSuccessfulAttemptId();
                }
                for (TaskAttemptInfo taskAttemptInfo : taskInfo.getTaskAttempts()) {
                    Assert.assertTrue(taskAttemptInfo.getCreationTime() > 0);
                    Assert.assertTrue(taskAttemptInfo.getAllocationTime() > 0);
                    Assert.assertTrue(taskAttemptInfo.getStartTime() > 0);
                    Assert.assertTrue(taskAttemptInfo.getFinishTime() > taskAttemptInfo.getStartTime());
                }
            }
            Assert.assertTrue(vertexInfo.getLastTaskToFinish() != null);
            if (vertexInfo.getVertexName().equals(TOKENIZER)) {
                Assert.assertTrue(vertexInfo.getInputEdges().size() == 0);
                Assert.assertTrue(vertexInfo.getOutputEdges().size() == 1);
                Assert.assertTrue(vertexInfo.getOutputVertices().size() == 1);
                Assert.assertTrue(vertexInfo.getInputVertices().size() == 0);
            } else {
                Assert.assertTrue(vertexInfo.getInputEdges().size() == 1);
                Assert.assertTrue(vertexInfo.getOutputEdges().size() == 0);
                Assert.assertTrue(vertexInfo.getOutputVertices().size() == 0);
                Assert.assertTrue(vertexInfo.getInputVertices().size() == 1);
            }
        }
        Assert.assertTrue(str.equals(str2));
    }

    @Test
    public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
        try {
            ATSImportTool.process(new String[]{"--dagId=" + runWordCount(WordCount.TokenProcessor.class.getName(), WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true), "--downloadDir=" + DOWNLOAD_DIR, "--atsAddress=http://atsHost:8188"});
            Assert.fail("Should have failed with processException");
        } catch (ParseException e) {
        }
    }

    @Test
    public void testParserWithFailedJob() throws Exception {
        String runWordCount = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class.getName(), "WordCount-With-Exception", true);
        Assert.assertTrue(ATSImportTool.process(new String[]{new StringBuilder().append("--dagId=").append(runWordCount).toString(), new StringBuilder().append("--downloadDir=").append(DOWNLOAD_DIR).toString(), new StringBuilder().append("--yarnTimelineAddress=").append(yarnTimelineAddress).toString()}) == 0);
        DagInfo dagInfo = getDagInfo(runWordCount);
        checkConfig(dagInfo);
        verifyDagInfo(dagInfo, true);
        VertexInfo vertex = dagInfo.getVertex(SUMMATION);
        Assert.assertTrue(vertex.getFailedTasks().size() == 1);
        Assert.assertTrue(((TaskInfo) vertex.getFailedTasks().get(0)).getFailedTaskAttempts().size() == 4);
        Assert.assertTrue(vertex.getStatus().equals(VertexState.FAILED.toString()));
        Assert.assertTrue(dagInfo.getFailedVertices().size() == 1);
        Assert.assertTrue(((VertexInfo) dagInfo.getFailedVertices().get(0)).getVertexName().equals(SUMMATION));
        Assert.assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
        Assert.assertTrue(((VertexInfo) dagInfo.getSuccessfullVertices().get(0)).getVertexName().equals(TOKENIZER));
        Assert.assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
        verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4L);
        verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1L);
        verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5L);
        verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()), "TaskCounter_Tokenizer_INPUT_Input", 10L);
        verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()), "TaskCounter_Tokenizer_OUTPUT_Summation", 0L);
        verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()), "TaskCounter_Tokenizer_OUTPUT_Summation", 20L);
        verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()), "TaskCounter_Tokenizer_OUTPUT_Summation", 20L);
        Iterator it = vertex.getTasks().iterator();
        while (it.hasNext()) {
            TaskAttemptInfo taskAttemptInfo = null;
            for (TaskAttemptInfo taskAttemptInfo2 : ((TaskInfo) it.next()).getTaskAttempts()) {
                if (taskAttemptInfo != null) {
                    Assert.assertTrue(taskAttemptInfo.getTaskAttemptId().equals(taskAttemptInfo2.getCreationCausalTA()));
                    Assert.assertTrue(taskAttemptInfo.getTerminationCause() != null);
                }
                taskAttemptInfo = taskAttemptInfo2;
            }
        }
        Assert.assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
    }

    private void isDAGEqual(DagInfo dagInfo, DagInfo dagInfo2) {
        Assert.assertNotNull(dagInfo);
        Assert.assertNotNull(dagInfo2);
        Assert.assertEquals(dagInfo.getStatus(), dagInfo2.getStatus());
        isEdgeEqual(dagInfo.getEdges(), dagInfo2.getEdges());
        isVertexEqual(dagInfo.getVertices(), dagInfo2.getVertices());
    }

    private void isVertexEqual(VertexInfo vertexInfo, VertexInfo vertexInfo2) {
        Assert.assertTrue(vertexInfo != null);
        Assert.assertTrue(vertexInfo2 != null);
        Assert.assertTrue(vertexInfo.getVertexName().equals(vertexInfo2.getVertexName()));
        Assert.assertTrue(vertexInfo.getProcessorClassName().equals(vertexInfo2.getProcessorClassName()));
        Assert.assertTrue(vertexInfo.getNumTasks() == vertexInfo2.getNumTasks());
        Assert.assertTrue(vertexInfo.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount());
        Assert.assertTrue(vertexInfo.getStatus().equals(vertexInfo2.getStatus()));
        isEdgeEqual(vertexInfo.getInputEdges(), vertexInfo2.getInputEdges());
        isEdgeEqual(vertexInfo.getOutputEdges(), vertexInfo2.getOutputEdges());
        Assert.assertTrue(vertexInfo.getInputVertices().size() == vertexInfo2.getInputVertices().size());
        Assert.assertTrue(vertexInfo.getOutputVertices().size() == vertexInfo2.getOutputVertices().size());
        Assert.assertTrue(vertexInfo.getNumTasks() == vertexInfo2.getNumTasks());
        isTaskEqual(vertexInfo.getTasks(), vertexInfo2.getTasks());
    }

    private void isVertexEqual(List<VertexInfo> list, List<VertexInfo> list2) {
        Assert.assertTrue("Vertices sizes should be the same", list.size() == list2.size());
        Iterator<VertexInfo> it = list.iterator();
        Iterator<VertexInfo> it2 = list2.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it2.hasNext());
            isVertexEqual(it.next(), it2.next());
        }
    }

    private void isEdgeEqual(EdgeInfo edgeInfo, EdgeInfo edgeInfo2) {
        Assert.assertTrue(edgeInfo != null);
        Assert.assertTrue(edgeInfo2 != null);
        Assert.assertTrue(edgeInfo.toString().equals(edgeInfo.toString()));
    }

    private void isEdgeEqual(Collection<EdgeInfo> collection, Collection<EdgeInfo> collection2) {
        Assert.assertTrue("sizes should be the same", collection.size() == collection.size());
        Iterator<EdgeInfo> it = collection.iterator();
        Iterator<EdgeInfo> it2 = collection2.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it2.hasNext());
            isEdgeEqual(it.next(), it2.next());
        }
    }

    private void isTaskEqual(Collection<TaskInfo> collection, Collection<TaskInfo> collection2) {
        Assert.assertTrue("sizes should be the same", collection.size() == collection.size());
        Iterator<TaskInfo> it = collection.iterator();
        Iterator<TaskInfo> it2 = collection2.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it2.hasNext());
            isTaskEqual(it.next(), it2.next());
        }
    }

    private void isTaskEqual(TaskInfo taskInfo, TaskInfo taskInfo2) {
        Assert.assertTrue(taskInfo != null);
        Assert.assertTrue(taskInfo2 != null);
        Assert.assertTrue(taskInfo.getVertexInfo() != null);
        Assert.assertTrue(taskInfo2.getVertexInfo() != null);
        Assert.assertTrue(taskInfo.getStatus().equals(taskInfo2.getStatus()));
        Assert.assertTrue(taskInfo.getVertexInfo().getVertexName().equals(taskInfo2.getVertexInfo().getVertexName()));
        isTaskAttemptEqual(taskInfo.getTaskAttempts(), taskInfo2.getTaskAttempts());
        isCountersSame(taskInfo, taskInfo2);
    }

    private void isCountersSame(BaseInfo baseInfo, BaseInfo baseInfo2) {
        isCounterSame(baseInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()), baseInfo2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()));
        isCounterSame(baseInfo.getCounter(TaskCounter.SPILLED_RECORDS.name()), baseInfo2.getCounter(TaskCounter.SPILLED_RECORDS.name()));
        isCounterSame(baseInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name()), baseInfo2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
        isCounterSame(baseInfo.getCounter(TaskCounter.OUTPUT_BYTES.name()), baseInfo2.getCounter(TaskCounter.OUTPUT_BYTES.name()));
        isCounterSame(baseInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name()), baseInfo2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
        isCounterSame(baseInfo.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()), baseInfo2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()));
        isCounterSame(baseInfo.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()), baseInfo2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()));
    }

    private void isCounterSame(Map<String, TezCounter> map, Map<String, TezCounter> map2) {
        for (Map.Entry<String, TezCounter> entry : map.entrySet()) {
            entry.getKey();
            long value = entry.getValue().getValue();
            Assert.assertTrue(map2.containsKey(entry.getKey()));
            Assert.assertTrue(map2.get(entry.getKey()).getValue() == value);
        }
    }

    private void isTaskAttemptEqual(Collection<TaskAttemptInfo> collection, Collection<TaskAttemptInfo> collection2) {
        Assert.assertTrue("sizes should be the same", collection.size() == collection.size());
        Iterator<TaskAttemptInfo> it = collection.iterator();
        Iterator<TaskAttemptInfo> it2 = collection2.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(it2.hasNext());
            isTaskAttemptEqual(it.next(), it2.next());
        }
    }

    private void isTaskAttemptEqual(TaskAttemptInfo taskAttemptInfo, TaskAttemptInfo taskAttemptInfo2) {
        Assert.assertTrue(taskAttemptInfo != null);
        Assert.assertTrue(taskAttemptInfo2 != null);
        Assert.assertTrue(taskAttemptInfo.getTaskInfo() != null);
        Assert.assertTrue(taskAttemptInfo2.getTaskInfo() != null);
        Assert.assertTrue(taskAttemptInfo.getStatus().equals(taskAttemptInfo2.getStatus()));
        Assert.assertTrue(taskAttemptInfo.getTaskInfo().getVertexInfo().getVertexName().equals(taskAttemptInfo2.getTaskInfo().getVertexInfo().getVertexName()));
        isCountersSame(taskAttemptInfo, taskAttemptInfo2);
    }

    private static void createSampleFile(Path path) throws IOException {
        fs.deleteOnExit(path);
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path)));
        for (int i = 0; i < 10; i++) {
            bufferedWriter.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
            bufferedWriter.newLine();
        }
        bufferedWriter.close();
    }

    private DagInfo getDagInfo(String str) throws TezException {
        DagInfo dAGData = new ATSFileParser(Arrays.asList(new File(DOWNLOAD_DIR + "/" + str + ".zip"))).getDAGData(str);
        Assert.assertTrue(dAGData.getDagId().equals(str));
        return dAGData;
    }

    private void verifyCounter(Map<String, TezCounter> map, String str, long j) {
        for (Map.Entry<String, TezCounter> entry : map.entrySet()) {
            if (str == null) {
                Assert.assertTrue(entry.getValue().getValue() == j);
            } else if (entry.getKey().equals(str)) {
                Assert.assertTrue(entry.getValue().getValue() == j);
            }
        }
    }

    TezClient getTezClient(boolean z) throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(miniTezCluster.getConfig());
        if (z) {
            tezConfiguration.setBoolean("yarn.timeline-service.enabled", z);
            tezConfiguration.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        } else {
            tezConfiguration.set("tez.history.logging.service.class", SimpleHistoryLoggingService.class.getName());
        }
        tezConfiguration.setBoolean("tez.allow.disabled.timeline-domains", true);
        TezClient create = TezClient.create("WordCount", tezConfiguration, false);
        create.start();
        create.waitTillReady();
        return create;
    }

    private String runWordCount(String str, String str2, String str3, boolean z) throws Exception {
        Path path = new Path("/tmp/outPath_" + System.currentTimeMillis());
        DataSourceDescriptor build = MRInput.createConfigBuilder(conf, TextInputFormat.class, inputLoc.toString()).build();
        DataSinkDescriptor build2 = MROutput.createConfigBuilder(conf, TextOutputFormat.class, path.toString()).build();
        Vertex addDataSource = Vertex.create(TOKENIZER, ProcessorDescriptor.create(str)).addDataSource(INPUT, build);
        OrderedPartitionedKVEdgeConfig build3 = OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(), HashPartitioner.class.getName()).build();
        Vertex addDataSink = Vertex.create(SUMMATION, ProcessorDescriptor.create(str2), 1).addDataSink(OUTPUT, build2);
        DAG create = DAG.create(str3);
        create.addVertex(addDataSource).addVertex(addDataSink).addEdge(Edge.create(addDataSource, addDataSink, build3.createDefaultEdgeProperty()));
        TezClient tezClient = getTezClient(z);
        CallerContext create2 = CallerContext.create("TezExamples", "Tez WordCount Example Job");
        ApplicationId appMasterApplicationId = tezClient.getAppMasterApplicationId();
        if (appMasterApplicationId == null) {
            appMasterApplicationId = ApplicationId.newInstance(1001L, 1);
        }
        create2.setCallerIdAndType(appMasterApplicationId.toString(), "TezApplication");
        create.setCallerContext(create2);
        tezClient.submitDAG(create).waitForCompletionWithStatusUpdates(Sets.newHashSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
        if (tezClient != null) {
            tezClient.stop();
        }
        return tezDAGID.toString();
    }

    private void verifyDagInfo(DagInfo dagInfo, boolean z) {
        if (z) {
            VersionInfo versionInfo = dagInfo.getVersionInfo();
            Assert.assertTrue(versionInfo != null);
            Assert.assertTrue(versionInfo.getVersion() != null);
            Assert.assertTrue(versionInfo.getRevision() != null);
            Assert.assertTrue(versionInfo.getBuildTime() != null);
        }
        Assert.assertTrue(dagInfo.getUserName() != null);
        Assert.assertTrue(!dagInfo.getUserName().isEmpty());
        Assert.assertTrue(dagInfo.getStartTime() > 0);
        Assert.assertTrue(dagInfo.getFinishTimeInterval() > 0);
        Assert.assertTrue(dagInfo.getStartTimeInterval() == 0);
        Assert.assertTrue(dagInfo.getStartTime() > 0);
        if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
            Assert.assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
        }
        Assert.assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval());
        Assert.assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
        Assert.assertTrue(dagInfo.getTimeTaken() > 0);
        Assert.assertNotNull(dagInfo.getCallerContext());
        Assert.assertEquals("TezExamples", dagInfo.getCallerContext().getContext());
        Assert.assertEquals("Tez WordCount Example Job", dagInfo.getCallerContext().getBlob());
        Assert.assertNotNull(dagInfo.getCallerContext().getCallerId());
        Assert.assertEquals("TezApplication", dagInfo.getCallerContext().getCallerType());
        for (VertexInfo vertexInfo : dagInfo.getVertices()) {
            verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
        }
        Assert.assertTrue(dagInfo.getFastestVertex() != null);
        if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
            Assert.assertTrue(dagInfo.getSlowestVertex() != null);
        }
    }

    private void verifyVertex(VertexInfo vertexInfo, boolean z) {
        Assert.assertTrue(vertexInfo != null);
        if (z) {
            Assert.assertTrue(vertexInfo.getFailedTasksCount() > 0);
        }
        Assert.assertTrue(vertexInfo.getStartTimeInterval() > 0);
        Assert.assertTrue(vertexInfo.getStartTime() > 0);
        Assert.assertTrue(vertexInfo.getFinishTimeInterval() > 0);
        Assert.assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval());
        Assert.assertTrue(vertexInfo.getVertexName() != null);
        if (!z) {
            Assert.assertTrue(vertexInfo.getFinishTime() > 0);
            Assert.assertTrue(vertexInfo.getFailedTasks().size() == 0);
            Assert.assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
            Assert.assertTrue(vertexInfo.getFailedTasksCount() == 0);
            Assert.assertTrue(vertexInfo.getAvgTaskDuration() > 0.0f);
            Assert.assertTrue(vertexInfo.getMaxTaskDuration() > 0);
            Assert.assertTrue(vertexInfo.getMinTaskDuration() > 0);
            Assert.assertTrue(vertexInfo.getTimeTaken() > 0);
            Assert.assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
            Assert.assertTrue(vertexInfo.getCompletedTasksCount() > 0);
            Assert.assertTrue(vertexInfo.getFirstTaskToStart() != null);
            Assert.assertTrue(vertexInfo.getSucceededTasksCount() > 0);
            Assert.assertTrue(vertexInfo.getTasks().size() > 0);
            Assert.assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime());
        }
        for (TaskInfo taskInfo : vertexInfo.getTasks()) {
            if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
                verifyTask(taskInfo, false);
            }
        }
        Iterator it = vertexInfo.getFailedTasks().iterator();
        while (it.hasNext()) {
            verifyTask((TaskInfo) it.next(), true);
        }
        Assert.assertTrue(vertexInfo.getProcessorClassName() != null);
        Assert.assertTrue(vertexInfo.getStatus() != null);
        Assert.assertTrue(vertexInfo.getDagInfo() != null);
        Assert.assertTrue(vertexInfo.getInitTimeInterval() > 0);
        Assert.assertTrue(vertexInfo.getNumTasks() > 0);
    }

    private void verifyTask(TaskInfo taskInfo, boolean z) {
        Assert.assertTrue(taskInfo != null);
        Assert.assertTrue(taskInfo.getStatus() != null);
        Assert.assertTrue(taskInfo.getStartTimeInterval() > 0);
        if (!z) {
            Assert.assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
            Assert.assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo.getFinishTimeInterval());
            Assert.assertTrue(taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
            Assert.assertTrue(taskInfo.getSuccessfulAttemptId() != null);
            Assert.assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
            Assert.assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime());
        }
        Assert.assertTrue(taskInfo.getTaskId() != null);
        Iterator it = taskInfo.getTaskAttempts().iterator();
        while (it.hasNext()) {
            verifyTaskAttemptInfo((TaskAttemptInfo) it.next());
        }
    }

    private void verifyTaskAttemptInfo(TaskAttemptInfo taskAttemptInfo) {
        if (taskAttemptInfo.getStatus() != null && taskAttemptInfo.getStatus().equals(TaskAttemptState.SUCCEEDED)) {
            Assert.assertTrue(taskAttemptInfo.getStartTimeInterval() > 0);
            Assert.assertTrue(taskAttemptInfo.getFinishTimeInterval() > 0);
            Assert.assertTrue(taskAttemptInfo.getCreationTime() > 0);
            Assert.assertTrue(taskAttemptInfo.getAllocationTime() > 0);
            Assert.assertTrue(taskAttemptInfo.getStartTime() > 0);
            Assert.assertTrue(taskAttemptInfo.getFinishTime() > 0);
            Assert.assertTrue(taskAttemptInfo.getFinishTime() > taskAttemptInfo.getStartTime());
            Assert.assertTrue(taskAttemptInfo.getFinishTime() > taskAttemptInfo.getFinishTimeInterval());
            Assert.assertTrue(taskAttemptInfo.getStartTime() > taskAttemptInfo.getStartTimeInterval());
            Assert.assertTrue(taskAttemptInfo.getNodeId() != null);
            Assert.assertTrue(taskAttemptInfo.getTimeTaken() != -1);
            Assert.assertTrue(taskAttemptInfo.getEvents() != null);
            Assert.assertTrue(taskAttemptInfo.getTezCounters() != null);
            Assert.assertTrue(taskAttemptInfo.getContainer() != null);
        }
        Assert.assertTrue(taskAttemptInfo.getTaskInfo() != null);
    }
}
