package org.apache.tez.test;

import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezContainerLogAppender;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidate;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.examples.SimpleSessionExample;
import org.apache.tez.examples.SortMergeJoinExample;
import org.apache.tez.mapreduce.examples.CartesianProduct;
import org.apache.tez.mapreduce.examples.MultipleCommitsExample;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/test/TestTezJobs.class */
public class TestTezJobs {
    protected static MiniTezCluster mrrTezCluster;
    protected static MiniDFSCluster dfsCluster;
    private static FileSystem remoteFs;
    private static FileSystem localFs;
    private static final String VERTEX_WITH_INITIALIZER_NAME = "VertexWithInitializer";
    private static final String EVENT_GENERATING_VERTEX_NAME = "EventGeneratingVertex";
    private static final String INPUT1_NAME = "Input1";
    private static final Logger LOG = LoggerFactory.getLogger(TestTezJobs.class);
    private static Configuration conf = new Configuration();
    private static String TEST_ROOT_DIR = "target/" + TestTezJobs.class.getName() + "-tmpDir";

    /* loaded from: input_file:org/apache/tez/test/TestTezJobs$FailingAttemptProcessor.class */
    public static class FailingAttemptProcessor extends SimpleProcessor {
        public FailingAttemptProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void run() throws Exception {
            if (getContext().getTaskIndex() == 0) {
                TestTezJobs.LOG.info("Failing task " + getContext().getTaskIndex() + ", attempt " + getContext().getTaskAttemptNumber());
                throw new IOException("Failing task " + getContext().getTaskIndex() + ", attempt " + getContext().getTaskAttemptNumber());
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestTezJobs$InputInitializerEventGeneratingProcessor.class */
    public static class InputInitializerEventGeneratingProcessor extends SimpleProcessor {
        public InputInitializerEventGeneratingProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void run() throws Exception {
            if (getContext().getTaskIndex() == 1 && getContext().getTaskAttemptNumber() == 0) {
                throw new IOException("Failing task 2, attempt 0");
            }
            InputInitializerEvent create = InputInitializerEvent.create(TestTezJobs.VERTEX_WITH_INITIALIZER_NAME, TestTezJobs.INPUT1_NAME, ByteBuffer.allocate(4).putInt(0, getContext().getTaskIndex()));
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.add(create);
            getContext().sendEvents(newArrayList);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestTezJobs$InputInitializerForTest.class */
    public static class InputInitializerForTest extends InputInitializer {
        private final ReentrantLock lock;
        private final Condition condition;
        private final BitSet eventsSeen;

        public InputInitializerForTest(InputInitializerContext inputInitializerContext) {
            super(inputInitializerContext);
            this.lock = new ReentrantLock();
            this.condition = this.lock.newCondition();
            this.eventsSeen = new BitSet();
            getContext().registerForVertexStateUpdates(TestTezJobs.EVENT_GENERATING_VERTEX_NAME, EnumSet.of(VertexState.SUCCEEDED));
        }

        public List<Event> initialize() throws Exception {
            this.lock.lock();
            try {
                this.condition.await();
                return null;
            } finally {
                this.lock.unlock();
            }
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
            this.lock.lock();
            try {
                for (InputInitializerEvent inputInitializerEvent : list) {
                    Preconditions.checkArgument(inputInitializerEvent.getSourceVertexName().equals(TestTezJobs.EVENT_GENERATING_VERTEX_NAME));
                    int i = inputInitializerEvent.getUserPayload().getInt(0);
                    Preconditions.checkState(!this.eventsSeen.get(i));
                    this.eventsSeen.set(i);
                }
            } finally {
                this.lock.unlock();
            }
        }

        public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
            this.lock.lock();
            try {
                Preconditions.checkArgument(vertexStateUpdate.getVertexState() == VertexState.SUCCEEDED);
                if (this.eventsSeen.cardinality() != getContext().getVertexNumTasks(TestTezJobs.EVENT_GENERATING_VERTEX_NAME)) {
                    throw new IllegalStateException("Received VertexState SUCCEEDED before receiving all InputInitializerEvents");
                }
                this.condition.signal();
            } finally {
                this.lock.unlock();
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestTezJobs$SortMergeJoinHelper.class */
    private static class SortMergeJoinHelper extends SortMergeJoinExample {
        private final TezClient tezClientInternal;
        private DAGClient dagClient;

        public SortMergeJoinHelper(TezClient tezClient) {
            this.tezClientInternal = tezClient;
        }

        public int runDag(DAG dag, boolean z, Logger logger) throws TezException, InterruptedException, IOException {
            this.tezClientInternal.waitTillReady();
            this.dagClient = this.tezClientInternal.submitDAG(dag);
            HashSet hashSet = new HashSet();
            if (z) {
                hashSet.add(StatusGetOpts.GET_COUNTERS);
            }
            DAGStatus waitForCompletionWithStatusUpdates = this.dagClient.waitForCompletionWithStatusUpdates(hashSet);
            if (waitForCompletionWithStatusUpdates.getState() == DAGStatus.State.SUCCEEDED) {
                return 0;
            }
            logger.info("DAG diagnostics: " + waitForCompletionWithStatusUpdates.getDiagnostics());
            return -1;
        }
    }

    @BeforeClass
    public static void setup() throws IOException {
        localFs = FileSystem.getLocal(conf);
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks((String[]) null).build();
            remoteFs = dfsCluster.getFileSystem();
            if (mrrTezCluster == null) {
                mrrTezCluster = new MiniTezCluster(TestTezJobs.class.getName(), 1, 1, 1);
                conf.set("fs.defaultFS", remoteFs.getUri().toString());
                conf.setLong("tez.am.sleep.time.before.exit.millis", 500L);
                mrrTezCluster.init(conf);
                mrrTezCluster.start();
            }
        } catch (IOException e) {
            throw new RuntimeException("problem starting mini dfs cluster", e);
        }
    }

    @AfterClass
    public static void tearDown() {
        if (mrrTezCluster != null) {
            mrrTezCluster.stop();
            mrrTezCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }

    @Test(timeout = 60000)
    public void testHashJoinExample() throws Exception {
        HashJoinExample hashJoinExample = new HashJoinExample();
        hashJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
        runHashJoinExample(hashJoinExample);
    }

    @Test(timeout = 60000)
    public void testHashJoinExampleWithLogPattern() throws Exception {
        HashJoinExample hashJoinExample = new HashJoinExample();
        Configuration configuration = new Configuration(mrrTezCluster.getConfig());
        configuration.set("tez.am.log.level", "debug");
        configuration.set("tez.task.log.level", "debug");
        configuration.set("tez.am.log.pattern.layout", "%d{ISO8601} [%p] [%t (queryId=%X{queryId} dag=%X{dagId})] |%c{2}|: %m%n");
        configuration.set("tez.task.log.pattern.layout", "%d{ISO8601} [%p] [%t (queryId=%X{queryId} dag=%X{dagId} task=%X{taskAttemptId})] |%c{2}|: %m%n");
        configuration.set("tez.mdc.custom.keys", "queryId");
        configuration.set("tez.mdc.custom.keys.conf.props", "hive.query.id");
        configuration.set("hive.query.id", "hello-upstream-application-12345");
        hashJoinExample.setConf(configuration);
        runHashJoinExample(hashJoinExample);
        configuration.set("tez.mdc.custom.keys", "");
        hashJoinExample.setConf(configuration);
        runHashJoinExample(hashJoinExample);
        configuration.set("tez.mdc.custom.keys", "queryId");
        configuration.set("tez.mdc.custom.keys.conf.props", "hive.query.id.null");
        hashJoinExample.setConf(configuration);
        runHashJoinExample(hashJoinExample);
        configuration.set("tez.am.log.pattern.layout", "");
        configuration.set("tez.task.log.pattern.layout", "");
        hashJoinExample.setConf(configuration);
        runHashJoinExample(hashJoinExample);
    }

    private void runHashJoinExample(HashJoinExample hashJoinExample) throws Exception {
        int nextInt = new Random(System.currentTimeMillis()).nextInt(10000);
        Path path = new Path(String.format("/tmp/tez-staging-dir%d", Integer.valueOf(nextInt)));
        Path path2 = new Path(String.format("/tmp/hashJoin%d/inPath1", Integer.valueOf(nextInt)));
        Path path3 = new Path(String.format("/tmp/hashJoin%d/inPath2", Integer.valueOf(nextInt)));
        Path path4 = new Path(String.format("/tmp/hashJoin%d/outPath", Integer.valueOf(nextInt)));
        remoteFs.mkdirs(path2);
        remoteFs.mkdirs(path3);
        remoteFs.mkdirs(path);
        HashSet hashSet = new HashSet();
        FSDataOutputStream create = remoteFs.create(new Path(path2, "file"));
        FSDataOutputStream create2 = remoteFs.create(new Path(path3, "file"));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create));
        BufferedWriter bufferedWriter2 = new BufferedWriter(new OutputStreamWriter(create2));
        for (int i = 0; i < 20; i++) {
            String str = "term" + i;
            bufferedWriter.write(str);
            bufferedWriter.newLine();
            if (i % 2 == 0) {
                bufferedWriter2.write(str);
                bufferedWriter2.newLine();
                hashSet.add(str);
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        create.close();
        create2.close();
        Assert.assertEquals(0L, hashJoinExample.run(new String[]{"-Dtez.staging-dir=" + path.toString(), "-counter", path2.toString(), path3.toString(), "1", path4.toString()}));
        FileStatus[] listStatus = remoteFs.listStatus(path4, new PathFilter() { // from class: org.apache.tez.test.TestTezJobs.1
            public boolean accept(Path path5) {
                String name = path5.getName();
                return (name.startsWith("_") || name.startsWith(".")) ? false : true;
            }
        });
        Assert.assertEquals(1L, listStatus.length);
        FSDataInputStream open = remoteFs.open(listStatus[0].getPath());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                open.close();
                Assert.assertEquals(0L, hashSet.size());
                return;
            }
            Assert.assertTrue(hashSet.remove(readLine));
        }
    }

    @Test(timeout = 120000)
    public void testHashJoinExampleWithDataViaEvent() throws Exception {
        Path path = new Path("/tmp/testHashJoinExampleDataViaEvent");
        Path path2 = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(path2);
        remoteFs.mkdirs(path);
        Path path3 = new Path(path, "inPath1");
        Path path4 = new Path(path, "inPath2");
        Path path5 = new Path(path, "expectedOutputPath");
        Path path6 = new Path(path, "outPath");
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path2.toString());
        tezConfiguration.setBoolean("tez.runtime.transfer.data-via-events.enabled", true);
        TezClient tezClient = null;
        try {
            tezClient = TezClient.create("HashJoinExampleSession", tezConfiguration, true);
            tezClient.start();
            Assert.assertEquals(0L, new JoinDataGen().run(tezConfiguration, new String[]{"-counter", path3.toString(), "1048576", path4.toString(), "8", path5.toString(), "2"}, tezClient));
            Assert.assertEquals(0L, new HashJoinExample().run(tezConfiguration, new String[]{path3.toString(), path4.toString(), "1", path6.toString(), "doBroadcast"}, tezClient));
            Assert.assertEquals(0L, new JoinValidate().run(tezConfiguration, new String[]{"-counter", path5.toString(), path6.toString(), "3"}, tezClient));
            if (tezClient != null) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            if (tezClient != null) {
                tezClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testHashJoinExampleDisableSplitGrouping() throws Exception {
        new HashJoinExample().setConf(new Configuration(mrrTezCluster.getConfig()));
        Path path = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
        Path path2 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/inPath1");
        Path path3 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/inPath2");
        Path path4 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/outPath");
        localFs.delete(path4, true);
        localFs.mkdirs(path2);
        localFs.mkdirs(path3);
        localFs.mkdirs(path);
        HashSet hashSet = new HashSet();
        FSDataOutputStream create = localFs.create(new Path(path2, "file"));
        FSDataOutputStream create2 = localFs.create(new Path(path3, "file"));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create));
        BufferedWriter bufferedWriter2 = new BufferedWriter(new OutputStreamWriter(create2));
        for (int i = 0; i < 20; i++) {
            String str = "term" + i;
            bufferedWriter.write(str);
            bufferedWriter.newLine();
            if (i % 2 == 0) {
                bufferedWriter2.write(str);
                bufferedWriter2.newLine();
                hashSet.add(str);
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        create.close();
        create2.close();
        Assert.assertEquals(0L, r0.run(new String[]{"-Dtez.staging-dir=" + path.toString(), "-counter", "-local", "-disableSplitGrouping", path2.toString(), path3.toString(), "1", path4.toString()}));
        FileStatus[] listStatus = localFs.listStatus(path4, new PathFilter() { // from class: org.apache.tez.test.TestTezJobs.2
            public boolean accept(Path path5) {
                String name = path5.getName();
                return (name.startsWith("_") || name.startsWith(".")) ? false : true;
            }
        });
        Assert.assertEquals(1L, listStatus.length);
        FSDataInputStream open = localFs.open(listStatus[0].getPath());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                open.close();
                Assert.assertEquals(0L, hashSet.size());
                return;
            }
            Assert.assertTrue(hashSet.remove(readLine));
        }
    }

    @Test(timeout = 60000)
    public void testSortMergeJoinExample() throws Exception {
        new SortMergeJoinExample().setConf(new Configuration(mrrTezCluster.getConfig()));
        Path path = new Path("/tmp/tez-staging-dir");
        Path path2 = new Path("/tmp/sortMerge/inPath1");
        Path path3 = new Path("/tmp/sortMerge/inPath2");
        Path path4 = new Path("/tmp/sortMerge/outPath");
        remoteFs.mkdirs(path2);
        remoteFs.mkdirs(path3);
        remoteFs.mkdirs(path);
        HashSet hashSet = new HashSet();
        FSDataOutputStream create = remoteFs.create(new Path(path2, "file"));
        FSDataOutputStream create2 = remoteFs.create(new Path(path3, "file"));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create));
        BufferedWriter bufferedWriter2 = new BufferedWriter(new OutputStreamWriter(create2));
        for (int i = 0; i < 20; i++) {
            String str = "term" + i;
            bufferedWriter.write(str);
            bufferedWriter.newLine();
            if (i % 2 == 0) {
                bufferedWriter2.write(str);
                bufferedWriter2.newLine();
                hashSet.add(str);
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        create.close();
        create2.close();
        Assert.assertEquals(0L, r0.run(new String[]{"-Dtez.staging-dir=" + path.toString(), "-Dtez.am.application.priority=2", "-counter", path2.toString(), path3.toString(), "1", path4.toString()}));
        FileStatus[] listStatus = remoteFs.listStatus(path4, new PathFilter() { // from class: org.apache.tez.test.TestTezJobs.3
            public boolean accept(Path path5) {
                String name = path5.getName();
                return (name.startsWith("_") || name.startsWith(".")) ? false : true;
            }
        });
        Assert.assertEquals(1L, listStatus.length);
        FSDataInputStream open = remoteFs.open(listStatus[0].getPath());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                open.close();
                Assert.assertEquals(0L, hashSet.size());
                return;
            }
            Assert.assertTrue(hashSet.remove(readLine));
        }
    }

    @Test(timeout = 60000)
    public void testPerIOCounterAggregation() throws Exception {
        Path path = new Path("/tmp/perIOCounterAgg/inPath1");
        Path path2 = new Path("/tmp/perIOCounterAgg/inPath2");
        Path path3 = new Path("/tmp/perIOCounterAgg/outPath");
        Set<String> generateSortMergeJoinInput = generateSortMergeJoinInput(path, path2);
        remoteFs.mkdirs(new Path("/tmp/tez-staging-dir"));
        Configuration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.setBoolean("tez.task.generate.counters.per.io", true);
        TezClient create = TezClient.create(SortMergeJoinHelper.class.getSimpleName(), tezConfiguration);
        create.start();
        SortMergeJoinHelper sortMergeJoinHelper = new SortMergeJoinHelper(create);
        sortMergeJoinHelper.setConf(tezConfiguration);
        Assert.assertEquals(0L, sortMergeJoinHelper.run(tezConfiguration, new String[]{"-Dtez.staging-dir=" + r0.toString(), "-counter", path.toString(), path2.toString(), "1", path3.toString()}, create));
        verifySortMergeJoinInput(path3, generateSortMergeJoinInput);
        HashSet hashSet = new HashSet();
        hashSet.add(StatusGetOpts.GET_COUNTERS);
        TezCounters vertexCounters = sortMergeJoinHelper.dagClient.getVertexStatus("joiner", hashSet).getVertexCounters();
        CounterGroup group = vertexCounters.getGroup(TaskCounter.class.getCanonicalName());
        CounterGroup group2 = vertexCounters.getGroup(TaskCounter.class.getSimpleName() + "_joiner_INPUT_input1");
        CounterGroup group3 = vertexCounters.getGroup(TaskCounter.class.getSimpleName() + "_joiner_INPUT_input2");
        Assert.assertTrue("aggregated counter group cannot be empty", group.size() > 0);
        Assert.assertTrue("per io group for input1 cannot be empty", group2.size() > 0);
        Assert.assertTrue("per io group for input1 cannot be empty", group3.size() > 0);
        int i = 0;
        for (TaskCounter taskCounter : Arrays.asList(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ, TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN, TaskCounter.COMBINE_INPUT_RECORDS, TaskCounter.MERGED_MAP_OUTPUTS, TaskCounter.NUM_DISK_TO_DISK_MERGES, TaskCounter.NUM_FAILED_SHUFFLE_INPUTS, TaskCounter.NUM_MEM_TO_DISK_MERGES, TaskCounter.NUM_SHUFFLED_INPUTS, TaskCounter.NUM_SKIPPED_INPUTS, TaskCounter.REDUCE_INPUT_GROUPS, TaskCounter.REDUCE_INPUT_RECORDS, TaskCounter.SHUFFLE_BYTES, TaskCounter.SHUFFLE_BYTES_DECOMPRESSED, TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, TaskCounter.SHUFFLE_BYTES_TO_DISK, TaskCounter.SHUFFLE_BYTES_TO_MEM, TaskCounter.SPILLED_RECORDS)) {
            TezCounter findCounter = group.findCounter(taskCounter.name(), false);
            TezCounter findCounter2 = group2.findCounter(taskCounter.name(), false);
            TezCounter findCounter3 = group3.findCounter(taskCounter.name(), false);
            Assert.assertNotNull("aggregated counter cannot be null " + taskCounter.name(), findCounter);
            Assert.assertNotNull("input1 counter cannot be null " + taskCounter.name(), findCounter2);
            Assert.assertNotNull("input2 counter cannot be null " + taskCounter.name(), findCounter3);
            Assert.assertEquals("aggregated counter does not match sum of input counters " + taskCounter.name(), findCounter.getValue(), findCounter2.getValue() + findCounter3.getValue());
            if (findCounter.getValue() > 0) {
                i++;
            }
        }
        Assert.assertTrue("At least one of the counter should be non-zero. invalid test ", i > 0);
        CounterGroup group4 = vertexCounters.getGroup(TaskCounter.class.getSimpleName() + "_joiner_OUTPUT_joinOutput");
        String name = TaskCounter.OUTPUT_RECORDS.name();
        TezCounter findCounter4 = group.findCounter(name, false);
        TezCounter findCounter5 = group4.findCounter(name, false);
        Assert.assertNotNull("aggregated counter cannot be null " + name, findCounter4);
        Assert.assertNotNull("output counter cannot be null " + name, findCounter5);
        Assert.assertTrue("counter value is zero. test is invalid", findCounter4.getValue() > 0);
        Assert.assertEquals("aggregated counter does not match sum of output counters " + name, findCounter4.getValue(), findCounter5.getValue());
    }

    @Test(timeout = 60000)
    public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception {
        testSortMergeJoinExampleDisableSplitGrouping(false);
    }

    @Test
    public void testSortMergeJoinExampleWithThreadDump() throws Exception {
        testSortMergeJoinExampleDisableSplitGrouping(true);
    }

    public void testSortMergeJoinExampleDisableSplitGrouping(boolean z) throws Exception {
        SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
        Configuration configuration = new Configuration(mrrTezCluster.getConfig());
        Path path = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/logPath");
        if (z) {
            TezContainerLogAppender tezContainerLogAppender = new TezContainerLogAppender();
            org.apache.log4j.Logger.getRootLogger().addAppender(tezContainerLogAppender);
            tezContainerLogAppender.setName("CLA");
            tezContainerLogAppender.setContainerLogDir(path.toString());
            configuration.set("tez.thread.dump.interval", "1ms");
        }
        sortMergeJoinExample.setConf(configuration);
        Path path2 = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
        Path path3 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1");
        Path path4 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2");
        Path path5 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/outPath");
        localFs.delete(path5, true);
        localFs.mkdirs(path3);
        localFs.mkdirs(path4);
        localFs.mkdirs(path2);
        HashSet hashSet = new HashSet();
        FSDataOutputStream create = localFs.create(new Path(path3, "file"));
        FSDataOutputStream create2 = localFs.create(new Path(path4, "file"));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create));
        BufferedWriter bufferedWriter2 = new BufferedWriter(new OutputStreamWriter(create2));
        for (int i = 0; i < 20; i++) {
            String str = "term" + i;
            bufferedWriter.write(str);
            bufferedWriter.newLine();
            if (i % 2 == 0) {
                bufferedWriter2.write(str);
                bufferedWriter2.newLine();
                hashSet.add(str);
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        create.close();
        create2.close();
        Assert.assertEquals(0L, sortMergeJoinExample.run(new String[]{"-Dtez.staging-dir=" + path2.toString(), "-counter", "-local", "-disableSplitGrouping", path3.toString(), path4.toString(), "1", path5.toString()}));
        FileStatus[] listStatus = localFs.listStatus(path5, new PathFilter() { // from class: org.apache.tez.test.TestTezJobs.4
            public boolean accept(Path path6) {
                String name = path6.getName();
                return (name.startsWith("_") || name.startsWith(".")) ? false : true;
            }
        });
        Assert.assertEquals(1L, listStatus.length);
        FSDataInputStream open = localFs.open(listStatus[0].getPath());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                break;
            } else {
                Assert.assertTrue(hashSet.remove(readLine));
            }
        }
        bufferedReader.close();
        open.close();
        Assert.assertEquals(0L, hashSet.size());
        if (z) {
            validateThreadDumpCaptured(path);
            org.apache.log4j.Logger.getRootLogger().removeAppender("CLA");
        }
    }

    private static void validateThreadDumpCaptured(Path path) throws IOException {
        RemoteIterator listFiles = localFs.listFiles(path, true);
        boolean z = false;
        boolean z2 = false;
        while (listFiles.hasNext()) {
            LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
            if (locatedFileStatus.getPath().getName().endsWith(".jstack")) {
                if (locatedFileStatus.getPath().getName().contains("attempt")) {
                    z2 = true;
                } else {
                    z = true;
                }
            }
        }
        Assert.assertTrue(z2);
        Assert.assertTrue(z);
    }

    @Test(timeout = 120000)
    public void testHashJoinExamplePipeline() throws Exception {
        Path path = new Path("/tmp/testHashJoinExample");
        Path path2 = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(path2);
        remoteFs.mkdirs(path);
        Path path3 = new Path(path, "inPath1");
        Path path4 = new Path(path, "inPath2");
        Path path5 = new Path(path, "expectedOutputPath");
        Path path6 = new Path(path, "outPath");
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path2.toString());
        TezClient tezClient = null;
        try {
            tezClient = TezClient.create("HashJoinExampleSession", tezConfiguration, true);
            tezClient.start();
            Assert.assertEquals(0L, new JoinDataGen().run(tezConfiguration, new String[]{"-counter", path3.toString(), "1048576", path4.toString(), "524288", path5.toString(), "2"}, tezClient));
            Assert.assertEquals(0L, new HashJoinExample().run(tezConfiguration, new String[]{path3.toString(), path4.toString(), "2", path6.toString()}, tezClient));
            Assert.assertEquals(0L, new JoinValidate().run(tezConfiguration, new String[]{"-counter", path5.toString(), path6.toString(), "3"}, tezClient));
            if (tezClient != null) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            if (tezClient != null) {
                tezClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 120000)
    public void testSortMergeJoinExamplePipeline() throws Exception {
        Path path = new Path("/tmp/testSortMergeExample");
        Path path2 = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(path2);
        remoteFs.mkdirs(path);
        Path path3 = new Path(path, "inPath1");
        Path path4 = new Path(path, "inPath2");
        Path path5 = new Path(path, "expectedOutputPath");
        Path path6 = new Path(path, "outPath");
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path2.toString());
        TezClient tezClient = null;
        try {
            tezClient = TezClient.create("SortMergeExampleSession", tezConfiguration, true);
            tezClient.start();
            Assert.assertEquals(0L, new JoinDataGen().run(tezConfiguration, new String[]{path3.toString(), "1048576", path4.toString(), "524288", path5.toString(), "2"}, tezClient));
            Assert.assertEquals(0L, new SortMergeJoinExample().run(tezConfiguration, new String[]{path3.toString(), path4.toString(), "2", path6.toString()}, tezClient));
            Assert.assertEquals(0L, new JoinValidate().run(tezConfiguration, new String[]{path5.toString(), path6.toString(), "3"}, tezClient));
            if (tezClient != null) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            if (tezClient != null) {
                tezClient.stop();
            }
            throw th;
        }
    }

    public static void generateOrderedWordCountInput(Path path, FileSystem fileSystem) throws IOException {
        Path path2 = new Path(path, "inPath1");
        Path path3 = new Path(path, "inPath2");
        FSDataOutputStream fSDataOutputStream = null;
        FSDataOutputStream fSDataOutputStream2 = null;
        try {
            fSDataOutputStream = fileSystem.create(path2);
            fSDataOutputStream2 = fileSystem.create(path3);
            for (int i = 1; i <= 10; i++) {
                String str = "a_" + i;
                for (int i2 = 10; i2 >= i; i2--) {
                    LOG.info("Writing " + str + " to input files");
                    fSDataOutputStream.write(str.getBytes());
                    fSDataOutputStream.writeChars("\t");
                    fSDataOutputStream2.write(str.getBytes());
                    fSDataOutputStream2.writeChars("\t");
                }
            }
            fSDataOutputStream.hsync();
            fSDataOutputStream2.hsync();
            if (fSDataOutputStream != null) {
                fSDataOutputStream.close();
            }
            if (fSDataOutputStream2 != null) {
                fSDataOutputStream2.close();
            }
        } catch (Throwable th) {
            if (fSDataOutputStream != null) {
                fSDataOutputStream.close();
            }
            if (fSDataOutputStream2 != null) {
                fSDataOutputStream2.close();
            }
            throw th;
        }
    }

    public static void verifyOrderedWordCountOutput(Path path, FileSystem fileSystem) throws IOException {
        int i = 10;
        byte[] bArr = new byte[4096];
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bArr, 0, fileSystem.open(path).read(bArr, 0, 4096))));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                Assert.assertEquals(0L, i);
                return;
            }
            LOG.info("Line: " + readLine + ", counter=" + i);
            int indexOf = readLine.indexOf("\t");
            Assert.assertEquals("a_" + i, readLine.substring(0, indexOf - 1));
            Assert.assertEquals((11 - i) * 2, Long.valueOf(readLine.substring(indexOf + 1, readLine.length())).longValue());
            i--;
        }
    }

    public static void verifyOutput(Path path, FileSystem fileSystem) throws IOException {
        Path path2 = null;
        boolean z = false;
        boolean z2 = false;
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (fileStatus.isFile()) {
                if (fileStatus.getPath().getName().equals("_SUCCESS")) {
                    z2 = true;
                } else if (fileStatus.getPath().getName().startsWith("part-")) {
                    if (z) {
                        Assert.fail("Found 2 part files instead of 1, paths=" + path2 + "," + fileStatus.getPath());
                    }
                    z = true;
                    path2 = fileStatus.getPath();
                    LOG.info("Found output at " + path2);
                }
            }
        }
        Assert.assertTrue(z);
        Assert.assertTrue(path2 != null);
        Assert.assertTrue(z2);
        verifyOrderedWordCountOutput(path2, fileSystem);
    }

    @Test(timeout = 60000)
    public void testOrderedWordCount() throws Exception {
        Path path = new Path("/tmp/owc-input/");
        Path path2 = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(path);
        remoteFs.mkdirs(path2);
        generateOrderedWordCountInput(path, remoteFs);
        Path path3 = new Path("/tmp/owc-output/");
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path2.toString());
        TezClient tezClient = null;
        try {
            Assert.assertTrue("OrderedWordCount failed", new OrderedWordCount().run(tezConfiguration, new String[]{"-counter", "/tmp/owc-input/", "/tmp/owc-output/", "2"}, (TezClient) null) == 0);
            verifyOutput(path3, remoteFs);
            remoteFs.delete(path2, true);
            if (0 != 0) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            remoteFs.delete(path2, true);
            if (0 != 0) {
                tezClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testOrderedWordCountDisableSplitGrouping() throws Exception {
        String str = TEST_ROOT_DIR + "/tmp/owc-input/";
        Path path = new Path(str);
        Path path2 = new Path(TEST_ROOT_DIR + "/tmp/owc-staging-dir");
        localFs.mkdirs(path);
        localFs.mkdirs(path2);
        generateOrderedWordCountInput(path, localFs);
        String str2 = TEST_ROOT_DIR + "/tmp/owc-output/";
        localFs.delete(new Path(str2), true);
        Path path3 = new Path(str2);
        TezConfiguration tezConfiguration = new TezConfiguration(conf);
        tezConfiguration.set("tez.staging-dir", path2.toString());
        TezClient tezClient = null;
        try {
            Assert.assertTrue("OrderedWordCount failed", new OrderedWordCount().run(tezConfiguration, new String[]{"-counter", "-local", "-disableSplitGrouping", str, str2, "2"}, (TezClient) null) == 0);
            verifyOutput(path3, localFs);
            localFs.delete(path2, true);
            if (0 != 0) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            localFs.delete(path2, true);
            if (0 != 0) {
                tezClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSimpleSessionExample() throws Exception {
        Path path = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(path);
        String[] strArr = new String[2];
        String[] strArr2 = new String[2];
        Path[] pathArr = new Path[2];
        for (int i = 0; i < 2; i++) {
            String str = "/tmp/owc-input-" + i + "/";
            strArr[i] = str;
            Path path2 = new Path(str);
            remoteFs.mkdirs(path2);
            generateOrderedWordCountInput(path2, remoteFs);
            String str2 = "/tmp/owc-output-" + i + "/";
            strArr2[i] = str2;
            pathArr[i] = new Path(str2);
        }
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path.toString());
        YarnClient createYarnClient = YarnClient.createYarnClient();
        try {
            createYarnClient.init(mrrTezCluster.getConfig());
            createYarnClient.start();
            List applications = createYarnClient.getApplications();
            int size = applications != null ? applications.size() : 0;
            SimpleSessionExample simpleSessionExample = new SimpleSessionExample();
            tezConfiguration.setBoolean("tez.am.mode.session", true);
            Assert.assertTrue("SimpleSessionExample failed", simpleSessionExample.run(tezConfiguration, new String[]{StringUtils.join(",", strArr), StringUtils.join(",", strArr2), "2"}, (TezClient) null) == 0);
            for (int i2 = 0; i2 < 2; i2++) {
                verifyOutput(pathArr[i2], remoteFs);
            }
            Assert.assertEquals(size + 1, createYarnClient.getApplications() != null ? r0.size() : 0);
            remoteFs.delete(path, true);
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
        } catch (Throwable th) {
            remoteFs.delete(path, true);
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testInvalidQueueSubmission() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        YarnClient createYarnClient = YarnClient.createYarnClient();
        try {
            try {
                createYarnClient.init(mrrTezCluster.getConfig());
                createYarnClient.start();
                SimpleSessionExample simpleSessionExample = new SimpleSessionExample();
                tezConfiguration.setBoolean("tez.am.mode.session", false);
                tezConfiguration.set("tez.queue.name", "nonexistent");
                remoteFs.mkdirs(new Path("/tmp/owc-input"));
                Assert.assertTrue("Job should have failed", simpleSessionExample.run(tezConfiguration, new String[]{StringUtils.join(",", new String[]{"/tmp/owc-input"}), StringUtils.join(",", new String[]{"/tmp/owc-output"}), "2"}, (TezClient) null) != 0);
                if (createYarnClient != null) {
                    createYarnClient.stop();
                }
            } catch (TezException e) {
                Assert.assertTrue(e.getMessage().contains("Failed to submit application"));
                if (createYarnClient != null) {
                    createYarnClient.stop();
                }
            }
        } catch (Throwable th) {
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testInvalidQueueSubmissionToSession() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        YarnClient createYarnClient = YarnClient.createYarnClient();
        try {
            try {
                createYarnClient.init(mrrTezCluster.getConfig());
                createYarnClient.start();
                SimpleSessionExample simpleSessionExample = new SimpleSessionExample();
                tezConfiguration.setBoolean("tez.am.mode.session", true);
                tezConfiguration.set("tez.queue.name", "nonexistent");
                remoteFs.mkdirs(new Path("/tmp/owc-input"));
                simpleSessionExample.run(tezConfiguration, new String[]{StringUtils.join(",", new String[]{"/tmp/owc-input"}), StringUtils.join(",", new String[]{"/tmp/owc-output"}), "2"}, (TezClient) null);
                Assert.fail("Job submission should have failed");
                if (createYarnClient != null) {
                    createYarnClient.stop();
                }
            } catch (SessionNotRunning e) {
                LOG.info("Session not running", e);
                if (createYarnClient != null) {
                    createYarnClient.stop();
                }
            } catch (TezException e2) {
                Assert.assertTrue(e2.getMessage().contains("Failed to submit application"));
                if (createYarnClient != null) {
                    createYarnClient.stop();
                }
            }
        } catch (Throwable th) {
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testVertexOrder() throws Exception {
        TezClient create = TezClient.create("TestVertexOrder", new TezConfiguration(mrrTezCluster.getConfig()));
        create.start();
        try {
            DAG createDAGForVertexOrder = SimpleTestDAG.createDAGForVertexOrder("dag1", conf);
            DAGClient submitDAG = create.submitDAG(createDAGForVertexOrder);
            DAGStatus dAGStatus = submitDAG.getDAGStatus((Set) null);
            while (!dAGStatus.isCompleted()) {
                LOG.info("Waiting for dag to complete. Sleeping for 500ms. DAG name: " + createDAGForVertexOrder.getName() + " DAG context: " + submitDAG.getExecutionContext() + " Current state: " + dAGStatus.getState());
                Thread.sleep(100L);
                dAGStatus = submitDAG.getDAGStatus((Set) null);
            }
            Assert.assertEquals(DAGStatus.State.SUCCEEDED, dAGStatus.getState());
            Set<String> keySet = dAGStatus.getVertexProgress().keySet();
            Assert.assertEquals(6L, keySet.size());
            int i = 0;
            for (String str : keySet) {
                if (i <= 1) {
                    Assert.assertTrue(str.equals("v1") || str.equals("v2"));
                } else if (i == 2) {
                    Assert.assertTrue(str.equals("v3"));
                } else if (i <= 4) {
                    Assert.assertTrue(str.equals("v4") || str.equals("v5"));
                } else {
                    Assert.assertTrue(str.equals("v6"));
                }
                i++;
            }
        } finally {
            if (create != null) {
                create.stop();
            }
        }
    }

    @Test(timeout = 60000)
    public void testInputInitializerEvents() throws TezException, InterruptedException, IOException {
        TezClient create = TezClient.create("TestInputInitializerEvents", new TezConfiguration(mrrTezCluster.getConfig()));
        create.start();
        try {
            DAG create2 = DAG.create("TestInputInitializerEvents");
            Vertex addDataSource = Vertex.create(VERTEX_WITH_INITIALIZER_NAME, ProcessorDescriptor.create(SleepProcessor.class.getName()).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1).addDataSource(INPUT1_NAME, DataSourceDescriptor.create(InputDescriptor.create(MultiAttemptDAG.NoOpInput.class.getName()), InputInitializerDescriptor.create(InputInitializerForTest.class.getName()), (Credentials) null));
            create2.addVertex(addDataSource).addVertex(Vertex.create(EVENT_GENERATING_VERTEX_NAME, ProcessorDescriptor.create(InputInitializerEventGeneratingProcessor.class.getName()), 5));
            DAGClient submitDAG = create.submitDAG(create2);
            submitDAG.waitForCompletion();
            Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
            create.stop();
        } catch (Throwable th) {
            create.stop();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMultipleCommits_OnDAGSuccess() throws Exception {
        Path path = new Path("/tmp/commit-staging-dir");
        Random random = new Random();
        int nextInt = random.nextInt(10) + 1;
        int nextInt2 = random.nextInt(10) + 1;
        int nextInt3 = random.nextInt(10) + 1;
        int nextInt4 = random.nextInt(10) + 1;
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path.toString());
        TezClient tezClient = null;
        try {
            Assert.assertTrue("MultipleCommitsExample failed", new MultipleCommitsExample().run(tezConfiguration, new String[]{"/tmp/commit-output-v1", new StringBuilder().append(nextInt).append("").toString(), "/tmp/commit-output-v2", new StringBuilder().append(nextInt2).append("").toString(), "/tmp/commit-output-uv12", new StringBuilder().append(nextInt3).append("").toString(), "/tmp/commit-output-v3", new StringBuilder().append(nextInt4).append("").toString()}, (TezClient) null) == 0);
            verifyCommits("/tmp/commit-output-v1", nextInt);
            verifyCommits("/tmp/commit-output-v2", nextInt2);
            verifyCommits("/tmp/commit-output-uv12", nextInt3);
            verifyCommits("/tmp/commit-output-v3", nextInt4);
            remoteFs.delete(path, true);
            if (0 != 0) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            remoteFs.delete(path, true);
            if (0 != 0) {
                tezClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMultipleCommits_OnVertexSuccess() throws Exception {
        Path path = new Path("/tmp/commit-staging-dir");
        Random random = new Random();
        int nextInt = random.nextInt(10) + 1;
        int nextInt2 = random.nextInt(10) + 1;
        int nextInt3 = random.nextInt(10) + 1;
        int nextInt4 = random.nextInt(10) + 1;
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.staging-dir", path.toString());
        TezClient tezClient = null;
        try {
            Assert.assertTrue("MultipleCommitsExample failed", new MultipleCommitsExample().run(tezConfiguration, new String[]{"/tmp/commit-output-v1", new StringBuilder().append(nextInt).append("").toString(), "/tmp/commit-output-v2", new StringBuilder().append(nextInt2).append("").toString(), "/tmp/commit-output-uv12", new StringBuilder().append(nextInt3).append("").toString(), "/tmp/commit-output-v3", new StringBuilder().append(nextInt4).append("").toString(), "commitOnVertexSuccess"}, (TezClient) null) == 0);
            verifyCommits("/tmp/commit-output-v1", nextInt);
            verifyCommits("/tmp/commit-output-v2", nextInt2);
            verifyCommits("/tmp/commit-output-uv12", nextInt3);
            verifyCommits("/tmp/commit-output-v3", nextInt4);
            remoteFs.delete(path, true);
            if (0 != 0) {
                tezClient.stop();
            }
        } catch (Throwable th) {
            remoteFs.delete(path, true);
            if (0 != 0) {
                tezClient.stop();
            }
            throw th;
        }
    }

    private void verifyCommits(String str, int i) throws IllegalArgumentException, IOException {
        for (int i2 = 0; i2 < i; i2++) {
            String str2 = str + "_" + i2;
            Assert.assertTrue("Output of " + str2 + " is not succeeded", remoteFs.exists(new Path(str2 + "/_SUCCESS")));
        }
    }

    private Set<String> generateSortMergeJoinInput(Path path, Path path2) throws IOException {
        remoteFs.mkdirs(path);
        remoteFs.mkdirs(path2);
        HashSet hashSet = new HashSet();
        FSDataOutputStream create = remoteFs.create(new Path(path, "file"));
        FSDataOutputStream create2 = remoteFs.create(new Path(path2, "file"));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create));
        BufferedWriter bufferedWriter2 = new BufferedWriter(new OutputStreamWriter(create2));
        for (int i = 0; i < 20; i++) {
            String str = "term" + i;
            bufferedWriter.write(str);
            bufferedWriter.newLine();
            if (i % 2 == 0) {
                bufferedWriter2.write(str);
                bufferedWriter2.newLine();
                hashSet.add(str);
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        create.close();
        create2.close();
        return hashSet;
    }

    private void verifySortMergeJoinInput(Path path, Set<String> set) throws IOException {
        FileStatus[] listStatus = remoteFs.listStatus(path, new PathFilter() { // from class: org.apache.tez.test.TestTezJobs.5
            public boolean accept(Path path2) {
                String name = path2.getName();
                return (name.startsWith("_") || name.startsWith(".")) ? false : true;
            }
        });
        Assert.assertEquals(1L, listStatus.length);
        FSDataInputStream open = remoteFs.open(listStatus[0].getPath());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                open.close();
                Assert.assertEquals(0L, set.size());
                return;
            }
            Assert.assertTrue(set.remove(readLine));
        }
    }

    @Test(timeout = 60000)
    public void testAMClientHeartbeatTimeout() throws Exception {
        Path path = new Path("/tmp/timeout-staging-dir");
        remoteFs.mkdirs(path);
        YarnClient createYarnClient = YarnClient.createYarnClient();
        try {
            createYarnClient.init(mrrTezCluster.getConfig());
            createYarnClient.start();
            List applications = createYarnClient.getApplications();
            int size = applications != null ? applications.size() : 0;
            TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
            tezConfiguration.set("tez.staging-dir", path.toString());
            tezConfiguration.setInt("tez.am.client.heartbeat.timeout.secs", 5);
            TezClient create = TezClient.create("testAMClientHeartbeatTimeout", tezConfiguration, true);
            create.start();
            create.cancelAMKeepAlive(true);
            ApplicationId appMasterApplicationId = create.getAppMasterApplicationId();
            Assert.assertEquals(size + 1, createYarnClient.getApplications() != null ? r0.size() : 0);
            while (true) {
                ApplicationReport applicationReport = createYarnClient.getApplicationReport(appMasterApplicationId);
                if (applicationReport.getYarnApplicationState() == YarnApplicationState.FINISHED || applicationReport.getYarnApplicationState() == YarnApplicationState.FAILED || applicationReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
                    break;
                } else {
                    Thread.sleep(1000L);
                }
            }
            Thread.sleep(2000L);
            ApplicationReport applicationReport2 = createYarnClient.getApplicationReport(appMasterApplicationId);
            LOG.info("App Report for appId=" + appMasterApplicationId + ", report=" + applicationReport2);
            Assert.assertTrue("Actual diagnostics: " + applicationReport2.getDiagnostics(), applicationReport2.getDiagnostics().contains("Client-to-AM Heartbeat timeout interval expired"));
            remoteFs.delete(path, true);
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
        } catch (Throwable th) {
            remoteFs.delete(path, true);
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSessionTimeout() throws Exception {
        Path path = new Path("/tmp/sessiontimeout-staging-dir");
        remoteFs.mkdirs(path);
        YarnClient createYarnClient = YarnClient.createYarnClient();
        try {
            createYarnClient.init(mrrTezCluster.getConfig());
            createYarnClient.start();
            List applications = createYarnClient.getApplications();
            int size = applications != null ? applications.size() : 0;
            TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
            tezConfiguration.set("tez.staging-dir", path.toString());
            tezConfiguration.setInt("tez.session.am.dag.submit.timeout.secs", 5);
            TezClient create = TezClient.create("testSessionTimeout", tezConfiguration, true);
            create.start();
            ApplicationId appMasterApplicationId = create.getAppMasterApplicationId();
            Assert.assertEquals(size + 1, createYarnClient.getApplications() != null ? r0.size() : 0);
            while (true) {
                ApplicationReport applicationReport = createYarnClient.getApplicationReport(appMasterApplicationId);
                if (applicationReport.getYarnApplicationState() == YarnApplicationState.FINISHED || applicationReport.getYarnApplicationState() == YarnApplicationState.FAILED || applicationReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
                    break;
                } else {
                    Thread.sleep(1000L);
                }
            }
            Thread.sleep(2000L);
            ApplicationReport applicationReport2 = createYarnClient.getApplicationReport(appMasterApplicationId);
            LOG.info("App Report for appId=" + appMasterApplicationId + ", report=" + applicationReport2);
            Assert.assertTrue("Actual diagnostics: " + applicationReport2.getDiagnostics(), applicationReport2.getDiagnostics().contains("Session timed out"));
            remoteFs.delete(path, true);
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
        } catch (Throwable th) {
            remoteFs.delete(path, true);
            if (createYarnClient != null) {
                createYarnClient.stop();
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testVertexFailuresMaxPercent() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.set("tez.vertex.failures.maxpercent", "50.0f");
        tezConfiguration.setInt("tez.am.task.max.failed.attempts", 1);
        TezClient create = TezClient.create("TestVertexFailuresMaxPercent", tezConfiguration);
        create.start();
        try {
            DAG create2 = DAG.create("TestVertexFailuresMaxPercent");
            Vertex create3 = Vertex.create("Parent", ProcessorDescriptor.create(FailingAttemptProcessor.class.getName()), 2);
            Vertex create4 = Vertex.create("Child", ProcessorDescriptor.create(FailingAttemptProcessor.class.getName()), 2);
            create2.addVertex(create3).addVertex(create4).addEdge(Edge.create(create3, create4, OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(), HashPartitioner.class.getName()).setFromConfiguration(tezConfiguration).build().createDefaultEdgeProperty()));
            DAGClient submitDAG = create.submitDAG(create2);
            submitDAG.waitForCompletion();
            Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
            create.stop();
        } catch (Throwable th) {
            create.stop();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testCartesianProduct() throws Exception {
        LOG.info("Running CartesianProduct Test");
        CartesianProduct cartesianProduct = new CartesianProduct();
        TezConfiguration tezConfiguration = new TezConfiguration(mrrTezCluster.getConfig());
        tezConfiguration.setInt("tez.cartesian-product.max-parallelism", 10);
        tezConfiguration.setInt("tez.cartesian-product.min-ops-per-worker", 25);
        Assert.assertEquals("CartesianProduct failed", cartesianProduct.run(tezConfiguration, (String[]) null, (TezClient) null), 0L);
    }
}
