/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Iterator;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class TestMapReduce {
    private static final File TEST_DIR = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), "TestMapReduce-mapreduce");
    private static FileSystem fs;
    private static int range;
    private static int counts;
    private static Random r;

    @After
    public void cleanup() {
        FileUtil.fullyDelete((File)TEST_DIR);
    }

    @Test
    public void testMapred() throws Exception {
        TestMapReduce.launch();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void launch() throws Exception {
        Path testdir;
        Configuration conf = new Configuration();
        int countsToGo = counts;
        int[] dist = new int[range];
        for (int i = 0; i < range; ++i) {
            double avgInts = 1.0 * (double)countsToGo / (double)(range - i);
            dist[i] = (int)Math.max(0L, Math.round(avgInts + Math.sqrt(avgInts) * r.nextGaussian()));
            countsToGo -= dist[i];
        }
        if (countsToGo > 0) {
            int n = dist.length - 1;
            dist[n] = dist[n] + countsToGo;
        }
        if (!fs.mkdirs(testdir = new Path(TEST_DIR.getAbsolutePath()))) {
            throw new IOException("Mkdirs failed to create " + testdir.toString());
        }
        Path randomIns = new Path(testdir, "genins");
        if (!fs.mkdirs(randomIns)) {
            throw new IOException("Mkdirs failed to create " + randomIns.toString());
        }
        Path answerkey = new Path(randomIns, "answer.key");
        try (SequenceFile.Writer out = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (Path)answerkey, IntWritable.class, IntWritable.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE);){
            for (int i = 0; i < range; ++i) {
                out.append((Writable)new IntWritable(i), (Writable)new IntWritable(dist[i]));
            }
        }
        TestMapReduce.printFiles(randomIns, conf);
        Path randomOuts = new Path(testdir, "genouts");
        fs.delete(randomOuts, true);
        Job genJob = Job.getInstance((Configuration)conf);
        FileInputFormat.setInputPaths((Job)genJob, (Path[])new Path[]{randomIns});
        genJob.setInputFormatClass(SequenceFileInputFormat.class);
        genJob.setMapperClass(RandomGenMapper.class);
        FileOutputFormat.setOutputPath((Job)genJob, (Path)randomOuts);
        genJob.setOutputKeyClass(IntWritable.class);
        genJob.setOutputValueClass(IntWritable.class);
        genJob.setReducerClass(RandomGenReducer.class);
        genJob.setNumReduceTasks(1);
        genJob.waitForCompletion(true);
        TestMapReduce.printFiles(randomOuts, conf);
        int intermediateReduces = 10;
        Path intermediateOuts = new Path(testdir, "intermediateouts");
        fs.delete(intermediateOuts, true);
        Job checkJob = Job.getInstance((Configuration)conf);
        FileInputFormat.setInputPaths((Job)checkJob, (Path[])new Path[]{randomOuts});
        checkJob.setMapperClass(RandomCheckMapper.class);
        FileOutputFormat.setOutputPath((Job)checkJob, (Path)intermediateOuts);
        checkJob.setOutputKeyClass(IntWritable.class);
        checkJob.setOutputValueClass(IntWritable.class);
        checkJob.setOutputFormatClass(MapFileOutputFormat.class);
        checkJob.setReducerClass(RandomCheckReducer.class);
        checkJob.setNumReduceTasks(intermediateReduces);
        checkJob.waitForCompletion(true);
        TestMapReduce.printFiles(intermediateOuts, conf);
        Path finalOuts = new Path(testdir, "finalouts");
        fs.delete(finalOuts, true);
        Job mergeJob = Job.getInstance((Configuration)conf);
        FileInputFormat.setInputPaths((Job)mergeJob, (Path[])new Path[]{intermediateOuts});
        mergeJob.setInputFormatClass(SequenceFileInputFormat.class);
        mergeJob.setMapperClass(MergeMapper.class);
        FileOutputFormat.setOutputPath((Job)mergeJob, (Path)finalOuts);
        mergeJob.setOutputKeyClass(IntWritable.class);
        mergeJob.setOutputValueClass(IntWritable.class);
        mergeJob.setOutputFormatClass(SequenceFileOutputFormat.class);
        mergeJob.setReducerClass(MergeReducer.class);
        mergeJob.setNumReduceTasks(1);
        mergeJob.waitForCompletion(true);
        TestMapReduce.printFiles(finalOuts, conf);
        boolean success = true;
        Path recomputedkey = new Path(finalOuts, "part-r-00000");
        int totalseen = 0;
        try (SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);){
            IntWritable key = new IntWritable();
            IntWritable val = new IntWritable();
            for (int i = 0; i < range; ++i) {
                if (dist[i] == 0) continue;
                if (!in.next((Writable)key, (Writable)val)) {
                    System.err.println("Cannot read entry " + i);
                    success = false;
                    break;
                }
                if (key.get() != i || val.get() != dist[i]) {
                    System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
                    success = false;
                }
                totalseen += val.get();
            }
            if (success && in.next((Writable)key, (Writable)val)) {
                System.err.println("Unnecessary lines in recomputed key!");
                success = false;
            }
        }
        int originalTotal = 0;
        for (int i = 0; i < dist.length; ++i) {
            originalTotal += dist[i];
        }
        System.out.println("Original sum: " + originalTotal);
        System.out.println("Recomputed sum: " + totalseen);
        Path resultFile = new Path(testdir, "results");
        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter((OutputStream)fs.create(resultFile)));){
            bw.write("Success=" + success + "\n");
            System.out.println("Success=" + success);
        }
        Assert.assertTrue((String)"testMapRed failed", (boolean)success);
        fs.delete(testdir, true);
    }

    private static void printTextFile(FileSystem fs, Path p) throws IOException {
        String line;
        BufferedReader in = new BufferedReader(new InputStreamReader((InputStream)fs.open(p)));
        while ((line = in.readLine()) != null) {
            System.out.println("  Row: " + line);
        }
        in.close();
    }

    private static void printSequenceFile(FileSystem fs, Path p, Configuration conf) throws IOException {
        SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
        Object key = null;
        Object value = null;
        while ((key = r.next(key)) != null) {
            value = r.getCurrentValue(value);
            System.out.println("  Row: " + key + ", " + value);
        }
        r.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static boolean isSequenceFile(FileSystem fs, Path f) throws IOException {
        try (FSDataInputStream in = fs.open(f);){
            byte[] seq = "SEQ".getBytes();
            for (int i = 0; i < seq.length; ++i) {
                if (seq[i] == in.read()) continue;
                boolean bl = false;
                return bl;
            }
        }
        return true;
    }

    private static void printFiles(Path dir, Configuration conf) throws IOException {
        FileSystem fs = dir.getFileSystem(conf);
        for (FileStatus f : fs.listStatus(dir)) {
            System.out.println("Reading " + f.getPath() + ": ");
            if (f.isDirectory()) {
                System.out.println("  it is a map file.");
                TestMapReduce.printSequenceFile(fs, new Path(f.getPath(), "data"), conf);
                continue;
            }
            if (TestMapReduce.isSequenceFile(fs, f.getPath())) {
                System.out.println("  it is a sequence file.");
                TestMapReduce.printSequenceFile(fs, f.getPath(), conf);
                continue;
            }
            System.out.println("  it is a text file.");
            TestMapReduce.printTextFile(fs, f.getPath());
        }
    }

    public static void main(String[] argv) throws Exception {
        if (argv.length < 2) {
            System.err.println("Usage: TestMapReduce <range> <counts>");
            System.err.println();
            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
            return;
        }
        int i = 0;
        range = Integer.parseInt(argv[i++]);
        counts = Integer.parseInt(argv[i++]);
        try {
            TestMapReduce.launch();
        }
        finally {
            FileUtil.fullyDelete((File)TEST_DIR);
        }
    }

    static {
        try {
            fs = FileSystem.getLocal((Configuration)new Configuration());
        }
        catch (IOException ioe) {
            fs = null;
        }
        range = 10;
        counts = 100;
        r = new Random();
    }

    static class RandomGenMapper
    extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
        RandomGenMapper() {
        }

        public void map(IntWritable key, IntWritable val, Mapper.Context context) throws IOException, InterruptedException {
            int randomVal = key.get();
            int randomCount = val.get();
            for (int i = 0; i < randomCount; ++i) {
                context.write((Object)new IntWritable(Math.abs(r.nextInt())), (Object)new IntWritable(randomVal));
            }
        }
    }

    static class RandomGenReducer
    extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        RandomGenReducer() {
        }

        public void reduce(IntWritable key, Iterable<IntWritable> it, Reducer.Context context) throws IOException, InterruptedException {
            for (IntWritable iw : it) {
                context.write((Object)iw, null);
            }
        }
    }

    static class RandomCheckMapper
    extends Mapper<WritableComparable<?>, Text, IntWritable, IntWritable> {
        RandomCheckMapper() {
        }

        public void map(WritableComparable<?> key, Text val, Mapper.Context context) throws IOException, InterruptedException {
            context.write((Object)new IntWritable(Integer.parseInt(val.toString().trim())), (Object)new IntWritable(1));
        }
    }

    static class RandomCheckReducer
    extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        RandomCheckReducer() {
        }

        public void reduce(IntWritable key, Iterable<IntWritable> it, Reducer.Context context) throws IOException, InterruptedException {
            int keyint = key.get();
            int count = 0;
            for (IntWritable iw : it) {
                ++count;
            }
            context.write((Object)new IntWritable(keyint), (Object)new IntWritable(count));
        }
    }

    static class MergeMapper
    extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
        MergeMapper() {
        }

        public void map(IntWritable key, IntWritable val, Mapper.Context context) throws IOException, InterruptedException {
            int keyint = key.get();
            int valint = val.get();
            context.write((Object)new IntWritable(keyint), (Object)new IntWritable(valint));
        }
    }

    static class MergeReducer
    extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        MergeReducer() {
        }

        public void reduce(IntWritable key, Iterator<IntWritable> it, Reducer.Context context) throws IOException, InterruptedException {
            int keyint = key.get();
            int total = 0;
            while (it.hasNext()) {
                total += it.next().get();
            }
            context.write((Object)new IntWritable(keyint), (Object)new IntWritable(total));
        }
    }
}

