/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MRRSleepJob
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(MRRSleepJob.class);
    public static final String MAP_SLEEP_COUNT = "mrr.sleepjob.map.sleep.count";
    public static final String REDUCE_SLEEP_COUNT = "mrr.sleepjob.reduce.sleep.count";
    public static final String MAP_SLEEP_TIME = "mrr.sleepjob.map.sleep.time";
    public static final String REDUCE_SLEEP_TIME = "mrr.sleepjob.reduce.sleep.time";
    public static final String IREDUCE_SLEEP_COUNT = "mrr.sleepjob.ireduce.sleep.count";
    public static final String IREDUCE_SLEEP_TIME = "mrr.sleepjob.ireduce.sleep.time";
    public static final String IREDUCE_STAGES_COUNT = "mrr.sleepjob.ireduces.stages.count";
    public static final String IREDUCE_TASKS_COUNT = "mrr.sleepjob.ireduces.tasks.count";
    public static final String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error";
    public static final String MAP_FATAL_ERROR = "mrr.sleepjob.map.fatal.error";
    public static final String MAP_ERROR_TASK_IDS = "mrr.sleepjob.map.error.task.ids";
    private Credentials credentials = new Credentials();

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)new Configuration(), (Tool)new MRRSleepJob(), (String[])args);
        System.exit(res);
    }

    public DAG createDAG(Configuration conf, Path stagingDir, int numMapper, int numReducer, int iReduceStagesCount, int numIReducer, long mapSleepTime, int mapSleepCount, long reduceSleepTime, int reduceSleepCount, long iReduceSleepTime, int iReduceSleepCount, boolean writeSplitsToDFS, boolean generateSplitsInAM) throws IOException, YarnException {
        JobConf mapStageConf = new JobConf(conf);
        mapStageConf.setInt("mapreduce.job.maps", numMapper);
        mapStageConf.setLong(MAP_SLEEP_TIME, mapSleepTime);
        mapStageConf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
        mapStageConf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
        mapStageConf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
        mapStageConf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
        mapStageConf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
        mapStageConf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
        mapStageConf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
        mapStageConf.set("mapreduce.job.map.class", SleepMapper.class.getName());
        mapStageConf.set("mapreduce.job.inputformat.class", SleepInputFormat.class.getName());
        if (numIReducer == 0 && numReducer == 0) {
            mapStageConf.set("mapreduce.job.outputformat.class", NullOutputFormat.class.getName());
        }
        MRHelpers.translateMRConfToTez((Configuration)mapStageConf, (boolean)false);
        JobConf[] intermediateReduceStageConfs = null;
        if (iReduceStagesCount > 0 && numIReducer > 0) {
            intermediateReduceStageConfs = new JobConf[iReduceStagesCount];
            for (int i = 1; i <= iReduceStagesCount; ++i) {
                JobConf iReduceStageConf = new JobConf(conf);
                iReduceStageConf.setLong(REDUCE_SLEEP_TIME, iReduceSleepTime);
                iReduceStageConf.setInt(REDUCE_SLEEP_COUNT, iReduceSleepCount);
                iReduceStageConf.setInt("mapreduce.job.reduces", numIReducer);
                iReduceStageConf.set("mapreduce.job.reduce.class", ISleepReducer.class.getName());
                iReduceStageConf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
                iReduceStageConf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
                iReduceStageConf.set("mapreduce.job.partitioner.class", MRRSleepJobPartitioner.class.getName());
                MRHelpers.translateMRConfToTez((Configuration)iReduceStageConf, (boolean)false);
                intermediateReduceStageConfs[i - 1] = iReduceStageConf;
            }
        }
        JobConf finalReduceConf = null;
        if (numReducer > 0) {
            finalReduceConf = new JobConf(conf);
            finalReduceConf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
            finalReduceConf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
            finalReduceConf.setInt("mapreduce.job.reduces", numReducer);
            finalReduceConf.set("mapreduce.job.reduce.class", SleepReducer.class.getName());
            finalReduceConf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
            finalReduceConf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
            finalReduceConf.set("mapreduce.job.outputformat.class", NullOutputFormat.class.getName());
            MRHelpers.translateMRConfToTez((Configuration)finalReduceConf, (boolean)false);
        }
        MRHelpers.configureMRApiUsage((Configuration)mapStageConf);
        if (iReduceStagesCount > 0 && numIReducer > 0) {
            for (int i = 0; i < iReduceStagesCount; ++i) {
                MRHelpers.configureMRApiUsage((Configuration)intermediateReduceStageConfs[i]);
            }
        }
        if (numReducer > 0) {
            MRHelpers.configureMRApiUsage((Configuration)finalReduceConf);
        }
        DataSourceDescriptor dataSource = null;
        if (!generateSplitsInAM && writeSplitsToDFS) {
            LOG.info("Writing splits to DFS");
            dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration((Configuration)mapStageConf, (Path)stagingDir, (boolean)true);
        } else {
            dataSource = MRInputLegacy.createConfigBuilder((Configuration)mapStageConf, SleepInputFormat.class).generateSplitsInAM(generateSplitsInAM).build();
        }
        DAG dag = DAG.create((String)"MRRSleepJob");
        String jarPath = ClassUtil.findContainingJar(((Object)((Object)this)).getClass());
        if (jarPath == null) {
            throw new TezUncheckedException("Could not find any jar containing MRRSleepJob.class in the classpath");
        }
        FileSystem stagingFs = stagingDir.getFileSystem(conf);
        Path remoteJarPath = new Path(stagingDir, "dag_job.jar");
        stagingFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
        FileStatus jarFileStatus = stagingFs.getFileStatus(remoteJarPath);
        TokenCache.obtainTokensForNamenodes((Credentials)this.credentials, (Path[])new Path[]{remoteJarPath}, (Configuration)mapStageConf);
        HashMap<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
        LocalResource dagJarLocalRsrc = LocalResource.newInstance((URL)ConverterUtils.getYarnUrlFromPath((Path)remoteJarPath), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.APPLICATION, (long)jarFileStatus.getLen(), (long)jarFileStatus.getModificationTime());
        commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
        ArrayList<Vertex> vertices = new ArrayList<Vertex>();
        UserPayload mapUserPayload = TezUtils.createUserPayloadFromConf((Configuration)mapStageConf);
        int numTasks = generateSplitsInAM ? -1 : numMapper;
        HashMap mapEnv = Maps.newHashMap();
        MRHelpers.updateEnvBasedOnMRTaskEnv((Configuration)mapStageConf, (Map)mapEnv, (boolean)true);
        HashMap reduceEnv = Maps.newHashMap();
        MRHelpers.updateEnvBasedOnMRTaskEnv((Configuration)mapStageConf, (Map)reduceEnv, (boolean)false);
        Vertex mapVertex = Vertex.create((String)"map", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)MapProcessor.class.getName()).setUserPayload(mapUserPayload)), (int)numTasks, (Resource)MRHelpers.getResourceForMRMapper((Configuration)mapStageConf));
        mapVertex.addTaskLocalFiles(commonLocalResources).addDataSource("MRInput", dataSource).setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper((Configuration)mapStageConf)).setTaskEnvironment((Map)mapEnv);
        vertices.add(mapVertex);
        if (iReduceStagesCount > 0 && numIReducer > 0) {
            for (int i = 0; i < iReduceStagesCount; ++i) {
                JobConf iconf = intermediateReduceStageConfs[i];
                UserPayload iReduceUserPayload = TezUtils.createUserPayloadFromConf((Configuration)iconf);
                Vertex ivertex = Vertex.create((String)("ireduce" + (i + 1)), (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(iReduceUserPayload)), (int)numIReducer, (Resource)MRHelpers.getResourceForMRReducer((Configuration)intermediateReduceStageConfs[i]));
                ivertex.addTaskLocalFiles(commonLocalResources).setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer((Configuration)intermediateReduceStageConfs[i])).setTaskEnvironment((Map)reduceEnv);
                vertices.add(ivertex);
            }
        }
        Vertex finalReduceVertex = null;
        if (numReducer > 0) {
            UserPayload reducePayload = TezUtils.createUserPayloadFromConf((Configuration)finalReduceConf);
            finalReduceVertex = Vertex.create((String)"reduce", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(reducePayload)), (int)numReducer, (Resource)MRHelpers.getResourceForMRReducer((Configuration)finalReduceConf));
            finalReduceVertex.addTaskLocalFiles(commonLocalResources).addDataSink("MROutput", MROutputLegacy.createConfigBuilder((Configuration)finalReduceConf, NullOutputFormat.class).build()).setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer((Configuration)finalReduceConf)).setTaskEnvironment((Map)reduceEnv);
            vertices.add(finalReduceVertex);
        } else {
            mapVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder((Configuration)mapStageConf, NullOutputFormat.class).build());
        }
        for (int i = 0; i < vertices.size(); ++i) {
            dag.addVertex((Vertex)vertices.get(i));
            if (i == 0) continue;
            HashMap partitionerConf = Maps.newHashMap();
            partitionerConf.put("mapreduce.job.partitioner.class", MRRSleepJobPartitioner.class.getName());
            JobConf edgeConfiguration = i + 1 == vertices.size() ? finalReduceConf : intermediateReduceStageConfs[i - 1];
            OrderedPartitionedKVEdgeConfig edgeConf = ((OrderedPartitionedKVEdgeConfig.Builder)OrderedPartitionedKVEdgeConfig.newBuilder((String)IntWritable.class.getName(), (String)IntWritable.class.getName(), (String)HashPartitioner.class.getName(), (Map)partitionerConf).configureInput().useLegacyInput().done()).setFromConfiguration((Configuration)edgeConfiguration).build();
            dag.addEdge(Edge.create((Vertex)((Vertex)vertices.get(i - 1)), (Vertex)((Vertex)vertices.get(i)), (EdgeProperty)edgeConf.createDefaultEdgeProperty()));
        }
        return dag;
    }

    @VisibleForTesting
    public Job createJob(int numMapper, int numReducer, int iReduceStagesCount, int numIReducer, long mapSleepTime, int mapSleepCount, long reduceSleepTime, int reduceSleepCount, long iReduceSleepTime, int iReduceSleepCount) throws IOException {
        Configuration conf = this.getConf();
        conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
        conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
        conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
        conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
        conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
        conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
        conf.setInt("mapreduce.job.maps", numMapper);
        conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
        conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
        conf.setInt("mrr.intermediate.num-stages", iReduceStagesCount);
        LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
        for (int i = 1; i <= iReduceStagesCount; ++i) {
            conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage((int)i, (String)"mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
            conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage((int)i, (String)"mapreduce.map.output.key.class"), IntWritable.class, Object.class);
            conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage((int)i, (String)"mapreduce.map.output.value.class"), IntWritable.class, Object.class);
            conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage((int)i, (String)"mapreduce.job.reduces"), numIReducer);
        }
        Job job = Job.getInstance((Configuration)conf, (String)"sleep");
        job.setNumReduceTasks(numReducer);
        job.setJarByClass(MRRSleepJob.class);
        job.setNumReduceTasks(numReducer);
        job.setMapperClass(SleepMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(SleepReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setInputFormatClass(SleepInputFormat.class);
        job.setPartitionerClass(MRRSleepJobPartitioner.class);
        job.setSpeculativeExecution(false);
        job.setJobName("Sleep job");
        FileInputFormat.addInputPath((Job)job, (Path)new Path("ignored"));
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("MRRSleepJob [-m numMapper] [-r numReducer] [-ir numIntermediateReducer] [-irs numIntermediateReducerStages] [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)] [-irt intermediateReduceSleepTime] [-recordt recordSleepTime (msec)] [-generateSplitsInAM (false)/true] [-writeSplitsToDfs (false)/true] [-numDags numDagsToSubmit");
            ToolRunner.printGenericCommandUsage((PrintStream)System.err);
            return 2;
        }
        int numMapper = 1;
        int numReducer = 1;
        int numIReducer = 1;
        long mapSleepTime = 100L;
        long reduceSleepTime = 100L;
        long recSleepTime = 100L;
        long iReduceSleepTime = 1L;
        int mapSleepCount = 1;
        int reduceSleepCount = 1;
        int iReduceSleepCount = 1;
        int iReduceStagesCount = 1;
        boolean writeSplitsToDfs = false;
        boolean generateSplitsInAM = false;
        boolean splitsOptionFound = false;
        boolean isSession = false;
        int numDags = 1;
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-m")) {
                numMapper = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-r")) {
                numReducer = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-ir")) {
                numIReducer = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-mt")) {
                mapSleepTime = Long.parseLong(args[++i]);
                continue;
            }
            if (args[i].equals("-rt")) {
                reduceSleepTime = Long.parseLong(args[++i]);
                continue;
            }
            if (args[i].equals("-irt")) {
                iReduceSleepTime = Long.parseLong(args[++i]);
                continue;
            }
            if (args[i].equals("-irs")) {
                iReduceStagesCount = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-recordt")) {
                recSleepTime = Long.parseLong(args[++i]);
                continue;
            }
            if (args[i].equals("-generateSplitsInAM")) {
                if (splitsOptionFound) {
                    throw new RuntimeException("Cannot use both -generateSplitsInAm and -writeSplitsToDfs together");
                }
                splitsOptionFound = true;
                generateSplitsInAM = Boolean.parseBoolean(args[++i]);
                continue;
            }
            if (args[i].equals("-writeSplitsToDfs")) {
                if (splitsOptionFound) {
                    throw new RuntimeException("Cannot use both -generateSplitsInAm and -writeSplitsToDfs together");
                }
                splitsOptionFound = true;
                writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
                continue;
            }
            if (!args[i].equals("-numDags")) continue;
            if ((numDags = Integer.parseInt(args[++i])) < 1) {
                throw new RuntimeException("numDags should be positive");
            }
            isSession = numDags > 1;
        }
        if (numIReducer > 0 && numReducer <= 0) {
            throw new RuntimeException("Cannot have intermediate reduces without a final reduce");
        }
        mapSleepCount = (int)Math.ceil((double)mapSleepTime / (double)recSleepTime);
        reduceSleepCount = (int)Math.ceil((double)reduceSleepTime / (double)recSleepTime);
        iReduceSleepCount = (int)Math.ceil((double)iReduceSleepTime / (double)recSleepTime);
        TezConfiguration conf = new TezConfiguration(this.getConf());
        conf.set("tez.staging-dir", conf.get("tez.staging-dir", TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT));
        String stagingBaseDir = conf.get("tez.staging-dir", TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
        Path stagingDir = new Path(stagingBaseDir, Long.toString(System.currentTimeMillis()));
        stagingDir = stagingDir.getFileSystem((Configuration)conf).makeQualified(stagingDir);
        TezClientUtils.ensureStagingDirExists((Configuration)conf, (Path)stagingDir);
        DAG dag = this.createDAG((Configuration)conf, stagingDir, numMapper, numReducer, iReduceStagesCount, numIReducer, mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount, iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
        TezClient tezSession = TezClient.create((String)"MRRSleep", (TezConfiguration)conf, (boolean)isSession, null, (Credentials)this.credentials);
        tezSession.start();
        try {
            while (numDags > 0) {
                DAGClient dagClient = tezSession.submitDAG(dag);
                dagClient.waitForCompletion();
                if (!dagClient.getDAGStatus(null).getState().equals((Object)DAGStatus.State.SUCCEEDED)) {
                    int n = 1;
                    return n;
                }
                --numDags;
            }
        }
        finally {
            tezSession.stop();
        }
        return 0;
    }

    public static class SleepMapper
    extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
        private long mapSleepDuration = 100L;
        private int mapSleepCount = 1;
        private int count = 0;
        private String vertexName;
        private boolean throwError = false;
        private boolean throwFatal = false;
        private boolean finalAttempt = false;

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            this.mapSleepCount = conf.getInt(MRRSleepJob.MAP_SLEEP_COUNT, this.mapSleepCount);
            this.mapSleepDuration = this.mapSleepCount == 0 ? 0L : conf.getLong(MRRSleepJob.MAP_SLEEP_TIME, 100L) / (long)this.mapSleepCount;
            this.vertexName = conf.get("mapreduce.task.vertex.name");
            TaskAttemptID taId = context.getTaskAttemptID();
            String[] taskIds = conf.getStrings(MRRSleepJob.MAP_ERROR_TASK_IDS);
            if (taId.getId() + 1 >= context.getMaxMapAttempts()) {
                this.finalAttempt = true;
            }
            boolean found = false;
            if (taskIds != null) {
                if (taskIds.length == 1 && taskIds[0].equals("*")) {
                    found = true;
                }
                if (!found) {
                    for (String taskId : taskIds) {
                        if (Integer.parseInt(taskId) != taId.getTaskID().getId()) continue;
                        found = true;
                        break;
                    }
                }
            }
            if (found) {
                if (!this.finalAttempt) {
                    this.throwError = conf.getBoolean(MRRSleepJob.MAP_THROW_ERROR, false);
                }
                this.throwFatal = conf.getBoolean(MRRSleepJob.MAP_FATAL_ERROR, false);
            }
        }

        public void map(IntWritable key, IntWritable value, Mapper.Context context) throws IOException, InterruptedException {
            try {
                LOG.info("Reading in " + this.vertexName + " taskid " + context.getTaskAttemptID().getTaskID().getId() + " key " + key.get());
                LOG.info("Sleeping in InitialMap, vertexName=" + this.vertexName + ", taskAttemptId=" + context.getTaskAttemptID() + ", mapSleepDuration=" + this.mapSleepDuration + ", mapSleepCount=" + this.mapSleepCount + ", sleepLeft=" + this.mapSleepDuration * (long)(this.mapSleepCount - this.count));
                context.setStatus("Sleeping... (" + this.mapSleepDuration * (long)(this.mapSleepCount - this.count) + ") ms left");
                if (this.mapSleepCount - this.count > 0) {
                    Thread.sleep(this.mapSleepDuration);
                }
                if (this.throwError || this.throwFatal) {
                    throw new IOException("Throwing a simulated error from map");
                }
            }
            catch (InterruptedException ex) {
                throw (IOException)new IOException("Interrupted while sleeping").initCause(ex);
            }
            ++this.count;
            int k = key.get();
            for (int i = 0; i < value.get(); ++i) {
                LOG.info("Writing in " + this.vertexName + " taskid " + context.getTaskAttemptID().getTaskID().getId() + " key " + (k + i) + " value 1");
                context.write((Object)new IntWritable(k + i), (Object)new IntWritable(1));
            }
        }
    }

    public static class SleepInputFormat
    extends InputFormat<IntWritable, IntWritable> {
        public List<InputSplit> getSplits(JobContext jobContext) {
            ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
            int numSplits = jobContext.getConfiguration().getInt("mapreduce.job.maps", 1);
            for (int i = 0; i < numSplits; ++i) {
                ret.add(new EmptySplit());
            }
            return ret;
        }

        public RecordReader<IntWritable, IntWritable> createRecordReader(InputSplit ignored, TaskAttemptContext taskContext) throws IOException {
            Configuration conf = taskContext.getConfiguration();
            final int count = conf.getInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
            if (count < 0) {
                throw new IOException("Invalid map count: " + count);
            }
            int totalIReduces = conf.getInt(MRRSleepJob.IREDUCE_STAGES_COUNT, 1);
            int reduceTasks = totalIReduces == 0 ? taskContext.getNumReduceTasks() : conf.getInt(MRRSleepJob.IREDUCE_TASKS_COUNT, 1);
            int sleepCount = totalIReduces == 0 ? conf.getInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1) : conf.getInt(MRRSleepJob.IREDUCE_SLEEP_COUNT, 1);
            final int emitPerMapTask = sleepCount * reduceTasks;
            return new RecordReader<IntWritable, IntWritable>(){
                private int records = 0;
                private int emitCount = 0;
                private IntWritable key = null;
                private IntWritable value = null;

                public void initialize(InputSplit split, TaskAttemptContext context) {
                }

                public boolean nextKeyValue() throws IOException {
                    if (count == 0) {
                        return false;
                    }
                    this.key = new IntWritable();
                    this.key.set(this.emitCount);
                    int emit = emitPerMapTask / count;
                    if (emitPerMapTask % count > this.records) {
                        ++emit;
                    }
                    this.emitCount += emit;
                    this.value = new IntWritable();
                    this.value.set(emit);
                    return this.records++ < count;
                }

                public IntWritable getCurrentKey() {
                    return this.key;
                }

                public IntWritable getCurrentValue() {
                    return this.value;
                }

                public void close() throws IOException {
                }

                public float getProgress() throws IOException {
                    return count == 0 ? 100.0f : (float)this.records / (float)count;
                }
            };
        }
    }

    public static class ISleepReducer
    extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private long iReduceSleepDuration = 100L;
        private int iReduceSleepCount = 1;
        private int count = 0;
        private String vertexName;

        protected void setup(Reducer.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            this.iReduceSleepCount = conf.getInt(MRRSleepJob.IREDUCE_SLEEP_COUNT, this.iReduceSleepCount);
            this.iReduceSleepDuration = this.iReduceSleepCount == 0 ? 0L : conf.getLong(MRRSleepJob.IREDUCE_SLEEP_TIME, 100L) / (long)this.iReduceSleepCount;
            this.vertexName = conf.get("mapreduce.task.vertex.name");
        }

        public void reduce(IntWritable key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            try {
                LOG.info("Reading in " + this.vertexName + " taskid " + context.getTaskAttemptID().getTaskID().getId() + " key " + key.get());
                LOG.info("Sleeping in IntermediateReduce, vertexName=" + this.vertexName + ", taskAttemptId=" + context.getTaskAttemptID() + ", iReduceSleepDuration=" + this.iReduceSleepDuration + ", iReduceSleepCount=" + this.iReduceSleepCount + ", sleepLeft=" + this.iReduceSleepDuration * (long)(this.iReduceSleepCount - this.count));
                context.setStatus("Sleeping... (" + this.iReduceSleepDuration * (long)(this.iReduceSleepCount - this.count) + ") ms left");
                if (this.iReduceSleepCount - this.count > 0) {
                    Thread.sleep(this.iReduceSleepDuration);
                }
            }
            catch (InterruptedException ex) {
                throw (IOException)new IOException("Interrupted while sleeping").initCause(ex);
            }
            ++this.count;
            int k = key.get();
            for (IntWritable value : values) {
                for (int i = 0; i < value.get(); ++i) {
                    LOG.info("Writing in " + this.vertexName + " taskid " + context.getTaskAttemptID().getTaskID().getId() + " key " + (k + i) + " value 1");
                    context.write((Object)new IntWritable(k + i), (Object)new IntWritable(1));
                }
            }
        }
    }

    public static class MRRSleepJobPartitioner
    extends Partitioner<IntWritable, IntWritable> {
        public int getPartition(IntWritable k, IntWritable v, int numPartitions) {
            return k.get() % numPartitions;
        }
    }

    public static class SleepReducer
    extends Reducer<IntWritable, IntWritable, NullWritable, NullWritable> {
        private long reduceSleepDuration = 100L;
        private int reduceSleepCount = 1;
        private int count = 0;
        private String vertexName;

        protected void setup(Reducer.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            this.reduceSleepCount = conf.getInt(MRRSleepJob.REDUCE_SLEEP_COUNT, this.reduceSleepCount);
            this.reduceSleepDuration = this.reduceSleepCount == 0 ? 0L : conf.getLong(MRRSleepJob.REDUCE_SLEEP_TIME, 100L) / (long)this.reduceSleepCount;
            this.vertexName = conf.get("mapreduce.task.vertex.name");
        }

        public void reduce(IntWritable key, Iterable<IntWritable> values, Reducer.Context context) throws IOException {
            try {
                LOG.info("Reading in " + this.vertexName + " taskid " + context.getTaskAttemptID().getTaskID().getId() + " key " + key.get());
                LOG.info("Sleeping in FinalReduce, vertexName=" + this.vertexName + ", taskAttemptId=" + context.getTaskAttemptID() + ", reduceSleepDuration=" + this.reduceSleepDuration + ", reduceSleepCount=" + this.reduceSleepCount + ", sleepLeft=" + this.reduceSleepDuration * (long)(this.reduceSleepCount - this.count));
                context.setStatus("Sleeping... (" + this.reduceSleepDuration * (long)(this.reduceSleepCount - this.count) + ") ms left");
                if (this.reduceSleepCount - this.count > 0) {
                    Thread.sleep(this.reduceSleepDuration);
                }
            }
            catch (InterruptedException ex) {
                throw (IOException)new IOException("Interrupted while sleeping").initCause(ex);
            }
            ++this.count;
        }
    }

    public static class EmptySplit
    extends InputSplit
    implements Writable {
        public void write(DataOutput out) throws IOException {
        }

        public void readFields(DataInput in) throws IOException {
        }

        public long getLength() {
            return 0L;
        }

        public String[] getLocations() {
            return new String[0];
        }
    }
}

