/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.TestSpeculativeExecution;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled
public class TestSpeculativeExecOnCluster {
    private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeExecOnCluster.class);
    private static final int NODE_MANAGERS_COUNT = 2;
    private static final boolean ENABLE_SPECULATIVE_MAP = true;
    private static final boolean ENABLE_SPECULATIVE_REDUCE = true;
    private static final int NUM_MAP_DEFAULT = 16;
    private static final int NUM_REDUCE_DEFAULT = 8;
    private static final int MAP_SLEEP_TIME_DEFAULT = 60000;
    private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000;
    private static final int MAP_SLEEP_COUNT_DEFAULT = 10000;
    private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000;
    private static final String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
    private static final String REDUCE_SLEEP_COUNT = "mapreduce.sleepjob.reduce.sleep.count";
    private static final String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
    private static final String REDUCE_SLEEP_TIME = "mapreduce.sleepjob.reduce.sleep.time";
    private static final String MAP_SLEEP_CALCULATOR_TYPE = "mapreduce.sleepjob.map.sleep.time.calculator";
    private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run";
    private static Map<String, SleepDurationCalculator> mapSleepTypeMapper = new HashMap<String, SleepDurationCalculator>();
    private static FileSystem localFs;
    private static final Path TEST_ROOT_DIR;
    private static final Path APP_JAR;
    private static final Path TEST_OUT_DIR;
    private MiniMRYarnCluster mrCluster;
    private int myNumMapper;
    private int myNumReduce;
    private int myMapSleepTime;
    private int myReduceSleepTime;
    private int myMapSleepCount;
    private int myReduceSleepCount;
    private String chosenSleepCalc;
    private Class<?> estimatorClass;
    private List<String> ignoredTests;

    public static Collection<Object[]> getTestParameters() {
        List<String> ignoredTests = Arrays.asList("stalled_run", "slowing_run", "step_stalled_run");
        return Arrays.asList({SimpleExponentialTaskRuntimeEstimator.class, ignoredTests, 16, 8}, {LegacyTaskRuntimeEstimator.class, ignoredTests, 16, 8});
    }

    public void initTestSpeculativeExecOnCluster(Class<? extends TaskRuntimeEstimator> pEstimatorKlass, List<String> pTestToIgnore, Integer pNumMapper, Integer pNumReduce) throws IOException {
        this.ignoredTests = pTestToIgnore;
        this.estimatorClass = pEstimatorKlass;
        this.myNumMapper = pNumMapper;
        this.myNumReduce = pNumReduce;
        this.setup();
    }

    public void setup() throws IOException {
        if (!new File(MiniMRYarnCluster.APPJAR).exists()) {
            LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
            return;
        }
        if (this.mrCluster == null) {
            this.mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 2);
            Configuration conf = new Configuration();
            this.mrCluster.init(conf);
            this.mrCluster.start();
        }
        localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
        localFs.setPermission(APP_JAR, new FsPermission("700"));
        this.myMapSleepTime = 60000;
        this.myReduceSleepTime = 10000;
        this.myMapSleepCount = 10000;
        this.myReduceSleepCount = 1000;
        this.chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT;
    }

    @AfterEach
    public void tearDown() {
        if (this.mrCluster != null) {
            this.mrCluster.stop();
            this.mrCluster = null;
        }
    }

    @ParameterizedTest(name="{index}: TaskEstimator(EstimatorClass {0})")
    @MethodSource(value={"getTestParameters"})
    public void testExecDynamicSlowingSpeculative(Class<? extends TaskRuntimeEstimator> pEstimatorKlass, List<String> pTestToIgnore, Integer pNumMapper, Integer pNumReduce) throws Exception {
        EstimatorMetricsPair[] estimatorPairs;
        this.initTestSpeculativeExecOnCluster(pEstimatorKlass, pTestToIgnore, pNumMapper, pNumReduce);
        this.chosenSleepCalc = "dynamic_slowing_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair specEstimator : estimatorPairs = new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (!this.estimatorClass.equals(specEstimator.estimatorClass)) continue;
            LOG.info("+++ Dynamic Slow Progress testing against " + this.estimatorClass.getName() + " +++");
            Job job = this.runSpecTest();
            boolean succeeded = job.waitForCompletion(true);
            Assertions.assertTrue((boolean)succeeded, (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Assertions.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState(), (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Counters counters = job.getCounters();
            String errorMessage = specEstimator.getErrorMessage(counters);
            boolean didSpeculate = specEstimator.didSpeculate(counters);
            Assertions.assertEquals((Object)didSpeculate, (Object)specEstimator.speculativeEstimator, (String)errorMessage);
            Assertions.assertEquals((long)0L, (long)counters.findCounter((Enum)JobCounter.NUM_FAILED_MAPS).getValue(), (String)("Failed maps higher than 0 " + this.estimatorClass.getName()));
        }
    }

    @ParameterizedTest(name="{index}: TaskEstimator(EstimatorClass {0})")
    @MethodSource(value={"getTestParameters"})
    public void testExecSlowNonSpeculative(Class<? extends TaskRuntimeEstimator> pEstimatorKlass, List<String> pTestToIgnore, Integer pNumMapper, Integer pNumReduce) throws Exception {
        EstimatorMetricsPair[] estimatorPairs;
        this.initTestSpeculativeExecOnCluster(pEstimatorKlass, pTestToIgnore, pNumMapper, pNumReduce);
        this.chosenSleepCalc = "slowing_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair specEstimator : estimatorPairs = new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, false), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (!this.estimatorClass.equals(specEstimator.estimatorClass)) continue;
            LOG.info("+++ Linear Slow Progress Non Speculative testing against " + this.estimatorClass.getName() + " +++");
            Job job = this.runSpecTest();
            boolean succeeded = job.waitForCompletion(true);
            Assertions.assertTrue((boolean)succeeded, (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Assertions.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState(), (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Counters counters = job.getCounters();
            String errorMessage = specEstimator.getErrorMessage(counters);
            boolean didSpeculate = specEstimator.didSpeculate(counters);
            Assertions.assertEquals((Object)didSpeculate, (Object)specEstimator.speculativeEstimator, (String)errorMessage);
            Assertions.assertEquals((long)0L, (long)counters.findCounter((Enum)JobCounter.NUM_FAILED_MAPS).getValue(), (String)("Failed maps higher than 0 " + this.estimatorClass.getName()));
        }
    }

    @ParameterizedTest(name="{index}: TaskEstimator(EstimatorClass {0})")
    @MethodSource(value={"getTestParameters"})
    public void testExecStepStalledSpeculative(Class<? extends TaskRuntimeEstimator> pEstimatorKlass, List<String> pTestToIgnore, Integer pNumMapper, Integer pNumReduce) throws Exception {
        EstimatorMetricsPair[] estimatorPairs;
        this.initTestSpeculativeExecOnCluster(pEstimatorKlass, pTestToIgnore, pNumMapper, pNumReduce);
        this.chosenSleepCalc = "step_stalled_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair specEstimator : estimatorPairs = new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (!this.estimatorClass.equals(specEstimator.estimatorClass)) continue;
            LOG.info("+++ Stalled Progress testing against " + this.estimatorClass.getName() + " +++");
            Job job = this.runSpecTest();
            boolean succeeded = job.waitForCompletion(true);
            Assertions.assertTrue((boolean)succeeded, (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Assertions.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState(), (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Counters counters = job.getCounters();
            String errorMessage = specEstimator.getErrorMessage(counters);
            boolean didSpeculate = specEstimator.didSpeculate(counters);
            Assertions.assertEquals((Object)didSpeculate, (Object)specEstimator.speculativeEstimator, (String)errorMessage);
            Assertions.assertEquals((long)0L, (long)counters.findCounter((Enum)JobCounter.NUM_FAILED_MAPS).getValue(), (String)("Failed maps higher than 0 " + this.estimatorClass.getName()));
        }
    }

    @ParameterizedTest(name="{index}: TaskEstimator(EstimatorClass {0})")
    @MethodSource(value={"getTestParameters"})
    public void testExecStalledSpeculative(Class<? extends TaskRuntimeEstimator> pEstimatorKlass, List<String> pTestToIgnore, Integer pNumMapper, Integer pNumReduce) throws Exception {
        EstimatorMetricsPair[] estimatorPairs;
        this.initTestSpeculativeExecOnCluster(pEstimatorKlass, pTestToIgnore, pNumMapper, pNumReduce);
        this.chosenSleepCalc = "stalled_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair specEstimator : estimatorPairs = new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (!this.estimatorClass.equals(specEstimator.estimatorClass)) continue;
            LOG.info("+++ Stalled Progress testing against " + this.estimatorClass.getName() + " +++");
            Job job = this.runSpecTest();
            boolean succeeded = job.waitForCompletion(true);
            Assertions.assertTrue((boolean)succeeded, (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Assertions.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState(), (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Counters counters = job.getCounters();
            String errorMessage = specEstimator.getErrorMessage(counters);
            boolean didSpeculate = specEstimator.didSpeculate(counters);
            Assertions.assertEquals((Object)didSpeculate, (Object)specEstimator.speculativeEstimator, (String)errorMessage);
            Assertions.assertEquals((long)0L, (long)counters.findCounter((Enum)JobCounter.NUM_FAILED_MAPS).getValue(), (String)("Failed maps higher than 0 " + this.estimatorClass.getName()));
        }
    }

    @ParameterizedTest(name="{index}: TaskEstimator(EstimatorClass {0})")
    @MethodSource(value={"getTestParameters"})
    public void testExecNonSpeculative(Class<? extends TaskRuntimeEstimator> pEstimatorKlass, List<String> pTestToIgnore, Integer pNumMapper, Integer pNumReduce) throws Exception {
        EstimatorMetricsPair[] estimatorPairs;
        this.initTestSpeculativeExecOnCluster(pEstimatorKlass, pTestToIgnore, pNumMapper, pNumReduce);
        if (!new File(MiniMRYarnCluster.APPJAR).exists()) {
            LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
            return;
        }
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair specEstimator : estimatorPairs = new EstimatorMetricsPair[]{new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, false), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (!this.estimatorClass.equals(specEstimator.estimatorClass)) continue;
            LOG.info("+++ No Speculation testing against " + this.estimatorClass.getName() + " +++");
            Job job = this.runSpecTest();
            boolean succeeded = job.waitForCompletion(true);
            Assertions.assertTrue((boolean)succeeded, (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Assertions.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState(), (String)("Job expected to succeed with estimator " + this.estimatorClass.getName()));
            Counters counters = job.getCounters();
            String errorMessage = specEstimator.getErrorMessage(counters);
            boolean didSpeculate = specEstimator.didSpeculate(counters);
            Assertions.assertEquals((Object)didSpeculate, (Object)specEstimator.speculativeEstimator, (String)errorMessage);
        }
    }

    private Job runSpecTest() throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = this.mrCluster.getConfig();
        conf.setBoolean("mapreduce.map.speculative", true);
        conf.setBoolean("mapreduce.reduce.speculative", true);
        conf.setClass("yarn.app.mapreduce.am.job.task.estimator.class", this.estimatorClass, TaskRuntimeEstimator.class);
        conf.setLong(MAP_SLEEP_TIME, (long)this.myMapSleepTime);
        conf.setLong(REDUCE_SLEEP_TIME, (long)this.myReduceSleepTime);
        conf.setInt(MAP_SLEEP_COUNT, this.myMapSleepCount);
        conf.setInt(REDUCE_SLEEP_COUNT, this.myReduceSleepCount);
        conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 1.0f);
        conf.setInt("mapreduce.job.maps", this.myNumMapper);
        conf.set(MAP_SLEEP_CALCULATOR_TYPE, this.chosenSleepCalc);
        Job job = Job.getInstance((Configuration)conf);
        job.setJarByClass(TestSpeculativeExecution.class);
        job.setMapperClass(SpeculativeSleepMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(SpeculativeSleepReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setInputFormatClass(SpeculativeSleepInputFormat.class);
        job.setPartitionerClass(SpeculativeSleepJobPartitioner.class);
        job.setNumReduceTasks(this.myNumReduce);
        FileInputFormat.addInputPath((Job)job, (Path)new Path("ignored"));
        try {
            localFs.delete(TEST_OUT_DIR, true);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        FileOutputFormat.setOutputPath((Job)job, (Path)TEST_OUT_DIR);
        job.addFileToClassPath(APP_JAR);
        job.setMaxMapAttempts(2);
        job.submit();
        return job;
    }

    static {
        mapSleepTypeMapper.put(MAP_SLEEP_CALCULATOR_TYPE_DEFAULT, new SleepDurationCalcImpl());
        mapSleepTypeMapper.put("stalled_run", new StalledSleepDurationCalcImpl());
        mapSleepTypeMapper.put("slowing_run", new SlowingSleepDurationCalcImpl());
        mapSleepTypeMapper.put("dynamic_slowing_run", new DynamicSleepDurationCalcImpl());
        mapSleepTypeMapper.put("step_stalled_run", new StepStalledSleepDurationCalcImpl());
        try {
            localFs = FileSystem.getLocal((Configuration)new Configuration());
        }
        catch (IOException io) {
            throw new RuntimeException("problem getting local fs", io);
        }
        TEST_ROOT_DIR = new Path("target", TestSpeculativeExecOnCluster.class.getName() + "-tmpDir").makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
        APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
        TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir");
    }

    class EstimatorMetricsPair {
        private Class<?> estimatorClass;
        private int expectedMapTasks;
        private int expectedReduceTasks;
        private boolean speculativeEstimator;

        EstimatorMetricsPair(Class<?> estimatorClass, int mapTasks, int reduceTasks, boolean isToSpeculate) {
            this.estimatorClass = estimatorClass;
            this.expectedMapTasks = mapTasks;
            this.expectedReduceTasks = reduceTasks;
            this.speculativeEstimator = isToSpeculate;
        }

        boolean didSpeculate(Counters counters) {
            long launchedMaps = counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_MAPS).getValue();
            long launchedReduce = counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_REDUCES).getValue();
            boolean isSpeculated = launchedMaps > (long)this.expectedMapTasks || launchedReduce > (long)this.expectedReduceTasks;
            return isSpeculated;
        }

        String getErrorMessage(Counters counters) {
            String msg = "Unexpected tasks running estimator " + this.estimatorClass.getName() + "\n\t";
            long launchedMaps = counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_MAPS).getValue();
            long launchedReduce = counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_REDUCES).getValue();
            if (this.speculativeEstimator) {
                if (launchedMaps < (long)this.expectedMapTasks) {
                    msg = msg + "maps " + launchedMaps + ", expected: " + this.expectedMapTasks;
                }
                if (launchedReduce < (long)this.expectedReduceTasks) {
                    msg = msg + ", reduces " + launchedReduce + ", expected: " + this.expectedReduceTasks;
                }
            } else {
                if (launchedMaps > (long)this.expectedMapTasks) {
                    msg = msg + "maps " + launchedMaps + ", expected: " + this.expectedMapTasks;
                }
                if (launchedReduce > (long)this.expectedReduceTasks) {
                    msg = msg + ", reduces " + launchedReduce + ", expected: " + this.expectedReduceTasks;
                }
            }
            return msg;
        }
    }

    public static class SpeculativeSleepReducer
    extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
        private long reduceSleepDuration = 10000L;
        private int reduceSleepCount = 1;
        private int count = 0;

        protected void setup(Reducer.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            this.reduceSleepCount = conf.getInt(TestSpeculativeExecOnCluster.REDUCE_SLEEP_COUNT, this.reduceSleepCount);
            this.reduceSleepDuration = this.reduceSleepCount == 0 ? 0L : conf.getLong(TestSpeculativeExecOnCluster.REDUCE_SLEEP_TIME, 10000L) / (long)this.reduceSleepCount;
        }

        public void reduce(IntWritable key, Iterable<NullWritable> values, Reducer.Context context) throws IOException {
            try {
                context.setStatus("Sleeping... (" + this.reduceSleepDuration * (long)(this.reduceSleepCount - this.count) + ") ms left");
                Thread.sleep(this.reduceSleepDuration);
            }
            catch (InterruptedException ex) {
                throw (IOException)new IOException("Interrupted while sleeping").initCause(ex);
            }
            ++this.count;
        }
    }

    public static class SpeculativeSleepMapper
    extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
        private long mapSleepDuration = 60000L;
        private int mapSleepCount = 1;
        private int count = 0;
        private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl();

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            this.mapSleepCount = conf.getInt(TestSpeculativeExecOnCluster.MAP_SLEEP_COUNT, this.mapSleepCount);
            this.mapSleepDuration = this.mapSleepCount == 0 ? 0L : conf.getLong(TestSpeculativeExecOnCluster.MAP_SLEEP_TIME, 60000L) / (long)this.mapSleepCount;
            this.sleepCalc = (SleepDurationCalculator)mapSleepTypeMapper.get(conf.get(TestSpeculativeExecOnCluster.MAP_SLEEP_CALCULATOR_TYPE, TestSpeculativeExecOnCluster.MAP_SLEEP_CALCULATOR_TYPE_DEFAULT));
        }

        public void map(IntWritable key, IntWritable value, Mapper.Context context) throws IOException, InterruptedException {
            try {
                context.setStatus("Sleeping... (" + this.mapSleepDuration * (long)(this.mapSleepCount - this.count) + ") ms left");
                long sleepTime = this.sleepCalc.calcSleepDuration(context.getTaskAttemptID(), this.count, this.mapSleepCount, this.mapSleepDuration);
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException ex) {
                throw (IOException)new IOException("Interrupted while sleeping").initCause(ex);
            }
            ++this.count;
            int k = key.get();
            for (int i = 0; i < value.get(); ++i) {
                context.write((Object)new IntWritable(k + i), (Object)NullWritable.get());
            }
        }
    }

    public static class DynamicSleepDurationCalcImpl
    implements SleepDurationCalculator {
        private double[] thresholds = new double[]{0.1, 0.25, 0.4, 0.5, 0.6, 0.65, 0.7, 0.8, 0.9};
        private double[] slowFactors = new double[]{2.0, 4.0, 5.0, 6.0, 10.0, 15.0, 20.0, 25.0, 30.0};

        DynamicSleepDurationCalcImpl() {
        }

        @Override
        public long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount, long defaultSleepDuration) {
            if (taId.getTaskType() == TaskType.MAP && taId.getTaskID().getId() == 0 && taId.getId() == 0) {
                double currProgress = (double)currCount / (double)totalCount;
                double slowFactor = 1.0;
                for (int i = 0; i < this.thresholds.length && !(this.thresholds[i] >= currProgress); ++i) {
                    slowFactor = this.slowFactors[i];
                }
                return (long)(slowFactor * (double)defaultSleepDuration);
            }
            return defaultSleepDuration;
        }
    }

    public static class StepStalledSleepDurationCalcImpl
    implements SleepDurationCalculator {
        private double threshold = 0.4;
        private double slowFactor = 10000.0;

        StepStalledSleepDurationCalcImpl() {
        }

        @Override
        public long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount, long defaultSleepDuration) {
            if (taId.getTaskType() == TaskType.MAP && taId.getTaskID().getId() == 0 && taId.getId() == 0 && this.threshold <= (double)currCount / (double)totalCount) {
                return (long)(this.slowFactor * (double)defaultSleepDuration);
            }
            return defaultSleepDuration;
        }
    }

    public static class StalledSleepDurationCalcImpl
    implements SleepDurationCalculator {
        StalledSleepDurationCalcImpl() {
        }

        @Override
        public long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount, long defaultSleepDuration) {
            if (taId.getTaskType() == TaskType.MAP && taId.getTaskID().getId() == 0 && taId.getId() == 0) {
                return 1000L * defaultSleepDuration;
            }
            return defaultSleepDuration;
        }
    }

    public static class SlowingSleepDurationCalcImpl
    implements SleepDurationCalculator {
        private double threshold = 0.4;
        private double slowFactor = 1.2;

        SlowingSleepDurationCalcImpl() {
        }

        @Override
        public long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount, long defaultSleepDuration) {
            if (taId.getTaskType() == TaskType.MAP && taId.getTaskID().getId() == 0 && taId.getId() == 0 && this.threshold <= (double)currCount / (double)totalCount) {
                return (long)(this.slowFactor * (double)defaultSleepDuration);
            }
            return defaultSleepDuration;
        }
    }

    public static class SleepDurationCalcImpl
    implements SleepDurationCalculator {
        private double threshold = 1.0;
        private double slowFactor = 1.0;

        SleepDurationCalcImpl() {
        }

        @Override
        public long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount, long defaultSleepDuration) {
            if (this.threshold <= (double)currCount / (double)totalCount) {
                return (long)(this.slowFactor * (double)defaultSleepDuration);
            }
            return defaultSleepDuration;
        }
    }

    public static interface SleepDurationCalculator {
        public long calcSleepDuration(TaskAttemptID var1, int var2, int var3, long var4);
    }

    public static class SpeculativeSleepInputFormat
    extends InputFormat<IntWritable, IntWritable> {
        public List<InputSplit> getSplits(JobContext jobContext) {
            ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
            int numSplits = jobContext.getConfiguration().getInt("mapreduce.job.maps", 1);
            for (int i = 0; i < numSplits; ++i) {
                ret.add(new EmptySplit());
            }
            return ret;
        }

        public RecordReader<IntWritable, IntWritable> createRecordReader(InputSplit ignored, TaskAttemptContext taskContext) throws IOException {
            Configuration conf = taskContext.getConfiguration();
            final int count = conf.getInt(TestSpeculativeExecOnCluster.MAP_SLEEP_COUNT, 10000);
            if (count < 0) {
                throw new IOException("Invalid map count: " + count);
            }
            int redcount = conf.getInt(TestSpeculativeExecOnCluster.REDUCE_SLEEP_COUNT, 1000);
            if (redcount < 0) {
                throw new IOException("Invalid reduce count: " + redcount);
            }
            final int emitPerMapTask = redcount * taskContext.getNumReduceTasks();
            return new RecordReader<IntWritable, IntWritable>(){
                private int records = 0;
                private int emitCount = 0;
                private IntWritable key = null;
                private IntWritable value = null;

                public void initialize(InputSplit split, TaskAttemptContext context) {
                }

                public boolean nextKeyValue() throws IOException {
                    if (count == 0) {
                        return false;
                    }
                    this.key = new IntWritable();
                    this.key.set(this.emitCount);
                    int emit = emitPerMapTask / count;
                    if (emitPerMapTask % count > this.records) {
                        ++emit;
                    }
                    this.emitCount += emit;
                    this.value = new IntWritable();
                    this.value.set(emit);
                    return this.records++ < count;
                }

                public IntWritable getCurrentKey() {
                    return this.key;
                }

                public IntWritable getCurrentValue() {
                    return this.value;
                }

                public void close() throws IOException {
                }

                public float getProgress() throws IOException {
                    return count == 0 ? 100.0f : (float)this.records / (float)count;
                }
            };
        }
    }

    public static class EmptySplit
    extends InputSplit
    implements Writable {
        public void write(DataOutput out) throws IOException {
        }

        public void readFields(DataInput in) throws IOException {
        }

        public long getLength() {
            return 0L;
        }

        public String[] getLocations() {
            return new String[0];
        }
    }

    public static class SpeculativeSleepJobPartitioner
    extends Partitioner<IntWritable, NullWritable> {
        public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
            return k.get() % numPartitions;
        }
    }
}

