/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLocalRunner {
    private static final Logger LOG = LoggerFactory.getLogger(TestLocalRunner.class);
    private static int[] INPUT_SIZES = new int[]{50000, 500, 500, 20, 5000, 500};
    private static int[] OUTPUT_SIZES = new int[]{1, 500, 500, 500, 500, 500};
    private static int[] SLEEP_INTERVALS = new int[]{10000, 15, 15, 20, 250, 60};
    private static int TOTAL_RECORDS = 0;
    private final String INPUT_DIR = "multiMapInput";
    private final String OUTPUT_DIR = "multiMapOutput";
    private static final int NUMBER_FILE_VAL = 100;

    private void createInputFile(Path dirPath, int id, int numRecords) throws IOException {
        String MESSAGE = "This is a line in a file: ";
        Path filePath = new Path(dirPath, "" + id);
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        FSDataOutputStream os = fs.create(filePath);
        BufferedWriter w = new BufferedWriter(new OutputStreamWriter((OutputStream)os));
        for (int i = 0; i < numRecords; ++i) {
            w.write("This is a line in a file: " + id + " " + i + "\n");
        }
        w.close();
    }

    private Path getInputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("multiMapInput");
        }
        return new Path(new Path(dataDir), "multiMapInput");
    }

    private Path getOutputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("multiMapOutput");
        }
        return new Path(new Path(dataDir), "multiMapOutput");
    }

    private Path createMultiMapsInput() throws IOException {
        Path inputPath;
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(inputPath = this.getInputPath())) {
            fs.delete(inputPath, true);
        }
        for (int i = 0; i < 6; ++i) {
            this.createInputFile(inputPath, i, INPUT_SIZES[i]);
        }
        return inputPath;
    }

    private void verifyOutput(Path outputPath) throws IOException {
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        Path outputFile = new Path(outputPath, "part-r-00000");
        FSDataInputStream is = fs.open(outputFile);
        BufferedReader r = new BufferedReader(new InputStreamReader((InputStream)is));
        String line = r.readLine().trim();
        Assertions.assertTrue((boolean)line.startsWith("0\t"), (String)"Line does not have correct key");
        int count = Integer.valueOf(line.substring(2));
        Assertions.assertEquals((int)TOTAL_RECORDS, (int)count, (String)"Incorrect count generated!");
        r.close();
    }

    @Test
    public void testGcCounter() throws Exception {
        Path inputPath = this.getInputPath();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        if (fs.exists(inputPath)) {
            fs.delete(inputPath, true);
        }
        this.createInputFile(inputPath, 0, 20);
        Job job = Job.getInstance();
        job.setMapperClass(GCMapper.class);
        job.setNumReduceTasks(0);
        job.getConfiguration().set("mapreduce.task.io.sort.mb", "25");
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        boolean ret = job.waitForCompletion(true);
        Assertions.assertTrue((boolean)ret, (String)"job failed");
        Counter gcCounter = job.getCounters().findCounter((Enum)TaskCounter.GC_TIME_MILLIS);
        Assertions.assertNotNull((Object)gcCounter);
        Assertions.assertTrue((gcCounter.getValue() > 0L ? 1 : 0) != 0, (String)"No time spent in gc");
    }

    @Test
    @Timeout(value=120L)
    public void testMultiMaps() throws Exception {
        Job job = Job.getInstance();
        Path inputPath = this.createMultiMapsInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps((JobContext)job, (int)6);
        job.getConfiguration().set("mapreduce.task.io.sort.mb", "25");
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        final Thread toInterrupt = Thread.currentThread();
        Thread interrupter = new Thread(){

            @Override
            public void run() {
                try {
                    Thread.sleep(120000L);
                    toInterrupt.interrupt();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        };
        LOG.info("Submitting job...");
        job.submit();
        LOG.info("Starting thread to interrupt main thread in 2 minutes");
        interrupter.start();
        LOG.info("Waiting for job to complete...");
        try {
            job.waitForCompletion(true);
        }
        catch (InterruptedException ie) {
            LOG.error("Interrupted while waiting for job completion", (Throwable)ie);
            for (int i = 0; i < 10; ++i) {
                LOG.error("Dumping stacks");
                ReflectionUtils.logThreadInfo((Logger)LOG, (String)"multimap threads", (long)0L);
                Thread.sleep(1000L);
            }
            throw ie;
        }
        LOG.info("Job completed, stopping interrupter");
        interrupter.interrupt();
        try {
            interrupter.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info("Verifying output");
        this.verifyOutput(outputPath);
    }

    @Test
    public void testInvalidMultiMapParallelism() throws Exception {
        Job job = Job.getInstance();
        Path inputPath = this.createMultiMapsInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps((JobContext)job, (int)-6);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        boolean success = job.waitForCompletion(true);
        Assertions.assertFalse((boolean)success, (String)"Job succeeded somehow");
    }

    @Test
    public void testEmptyMaps() throws Exception {
        Job job = Job.getInstance();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setInputFormatClass(EmptyInputFormat.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        boolean success = job.waitForCompletion(true);
        Assertions.assertTrue((boolean)success, (String)"Empty job should work");
    }

    private Path getNumberDirPath() {
        return new Path(this.getInputPath(), "numberfiles");
    }

    private Path makeNumberFile(int fileNum, int value) throws IOException {
        Path workDir = this.getNumberDirPath();
        Path filePath = new Path(workDir, "file" + fileNum);
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        FSDataOutputStream os = fs.create(filePath);
        BufferedWriter w = new BufferedWriter(new OutputStreamWriter((OutputStream)os));
        w.write("" + value);
        w.close();
        return filePath;
    }

    private void verifyNumberJob(int numMaps) throws Exception {
        Path outputDir = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        FileStatus[] stats = fs.listStatus(outputDir);
        int valueSum = 0;
        for (FileStatus f : stats) {
            FSDataInputStream istream = fs.open(f.getPath());
            BufferedReader r = new BufferedReader(new InputStreamReader((InputStream)istream));
            String line = null;
            while ((line = r.readLine()) != null) {
                valueSum += Integer.valueOf(line.trim()).intValue();
            }
            r.close();
        }
        int maxVal = 99;
        int expectedPerMapper = maxVal * (maxVal + 1) / 2;
        int expectedSum = expectedPerMapper * numMaps;
        LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
        Assertions.assertEquals((int)expectedSum, (int)valueSum, (String)"Didn't get all our results back");
    }

    private void doMultiReducerTest(int numMaps, int numReduces, int parallelMaps, int parallelReduces) throws Exception {
        Path in = this.getNumberDirPath();
        Path out = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(out)) {
            fs.delete(out, true);
        }
        if (fs.exists(in)) {
            fs.delete(in, true);
        }
        for (int i = 0; i < numMaps; ++i) {
            this.makeNumberFile(i, 100);
        }
        Job job = Job.getInstance();
        job.setNumReduceTasks(numReduces);
        job.setMapperClass(SequenceMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath((Job)job, (Path)in);
        FileOutputFormat.setOutputPath((Job)job, (Path)out);
        LocalJobRunner.setLocalMaxRunningMaps((JobContext)job, (int)parallelMaps);
        LocalJobRunner.setLocalMaxRunningReduces((JobContext)job, (int)parallelReduces);
        boolean result = job.waitForCompletion(true);
        Assertions.assertTrue((boolean)result, (String)"Job failed!!");
        this.verifyNumberJob(numMaps);
    }

    @Test
    public void testOneMapMultiReduce() throws Exception {
        this.doMultiReducerTest(1, 2, 1, 1);
    }

    @Test
    public void testOneMapMultiParallelReduce() throws Exception {
        this.doMultiReducerTest(1, 2, 1, 2);
    }

    @Test
    public void testMultiMapOneReduce() throws Exception {
        this.doMultiReducerTest(4, 1, 2, 1);
    }

    @Test
    public void testMultiMapMultiReduce() throws Exception {
        this.doMultiReducerTest(4, 4, 2, 2);
    }

    static {
        for (int i = 0; i < 6; ++i) {
            TOTAL_RECORDS += INPUT_SIZES[i] * OUTPUT_SIZES[i];
        }
    }

    public static class SequenceMapper
    extends Mapper<LongWritable, Text, Text, NullWritable> {
        public void map(LongWritable k, Text v, Mapper.Context c) throws IOException, InterruptedException {
            int max = Integer.valueOf(v.toString());
            for (int i = 0; i < max; ++i) {
                c.write((Object)new Text("" + i), (Object)NullWritable.get());
            }
        }
    }

    private static class EmptyRecordReader
    extends RecordReader<Object, Object> {
        private EmptyRecordReader() {
        }

        public void initialize(InputSplit split, TaskAttemptContext context) {
        }

        public Object getCurrentKey() {
            return new Object();
        }

        public Object getCurrentValue() {
            return new Object();
        }

        public float getProgress() {
            return 0.0f;
        }

        public void close() {
        }

        public boolean nextKeyValue() {
            return false;
        }
    }

    private static class EmptyInputFormat
    extends InputFormat<Object, Object> {
        private EmptyInputFormat() {
        }

        public List<InputSplit> getSplits(JobContext context) {
            return new ArrayList<InputSplit>();
        }

        public RecordReader<Object, Object> createRecordReader(InputSplit split, TaskAttemptContext context) {
            return new EmptyRecordReader();
        }
    }

    private static class GCMapper
    extends Mapper<LongWritable, Text, LongWritable, Text> {
        private GCMapper() {
        }

        public void map(LongWritable key, Text val, Mapper.Context c) throws IOException, InterruptedException {
            ArrayList<Integer> lst = new ArrayList<Integer>();
            for (int i = 0; i < 20000; ++i) {
                lst.add(new Integer(i));
            }
            int sum = 0;
            Iterator iterator = lst.iterator();
            while (iterator.hasNext()) {
                int x = (Integer)iterator.next();
                sum += x;
            }
            lst = null;
            System.gc();
            c.write((Object)new LongWritable((long)sum), (Object)val);
        }
    }

    private static class CountingReducer
    extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
        private CountingReducer() {
        }

        public void reduce(LongWritable key, Iterable<Text> vals, Reducer.Context context) throws IOException, InterruptedException {
            long out = 0L;
            for (Text val : vals) {
                ++out;
            }
            context.write((Object)key, (Object)new LongWritable(out));
        }
    }

    private static class StressMapper
    extends Mapper<LongWritable, Text, LongWritable, Text> {
        private int threadId;
        public long exposedState;

        private StressMapper() {
        }

        protected void setup(Mapper.Context context) {
            FileSplit split = (FileSplit)context.getInputSplit();
            Path filePath = split.getPath();
            String name = filePath.getName();
            this.threadId = Integer.valueOf(name);
            LOG.info("Thread " + this.threadId + " : " + context.getInputSplit());
        }

        public void map(LongWritable key, Text val, Mapper.Context c) throws IOException, InterruptedException {
            for (int i = 0; i < OUTPUT_SIZES[this.threadId]; ++i) {
                c.write((Object)new LongWritable(0L), (Object)val);
                if (i % SLEEP_INTERVALS[this.threadId] != 1) continue;
                Thread.sleep(1L);
            }
        }

        protected void cleanup(Mapper.Context context) {
            LOG.debug("Busy loop counter: " + this.exposedState);
        }
    }
}

