/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestMapperReducerCleanup {
    static boolean mapCleanup = false;
    static boolean reduceCleanup = false;
    static boolean recordReaderCleanup = false;
    static boolean recordWriterCleanup = false;
    private final String INPUT_DIR = "input";
    private final String OUTPUT_DIR = "output";

    static void reset() {
        mapCleanup = false;
        reduceCleanup = false;
        recordReaderCleanup = false;
        recordWriterCleanup = false;
    }

    private void createInputFile(Path dirPath, int id, int numRecords) throws IOException {
        String MESSAGE = "This is a line in a file: ";
        Path filePath = new Path(dirPath, "" + id);
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        FSDataOutputStream os = fs.create(filePath);
        BufferedWriter w = new BufferedWriter(new OutputStreamWriter((OutputStream)os));
        for (int i = 0; i < numRecords; ++i) {
            w.write("This is a line in a file: " + id + " " + i + "\n");
        }
        w.close();
    }

    private Path getInputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("input");
        }
        return new Path(new Path(dataDir), "input");
    }

    private Path getOutputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("output");
        }
        return new Path(new Path(dataDir), "output");
    }

    private Path createInput() throws IOException {
        Path inputPath;
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(inputPath = this.getInputPath())) {
            fs.delete(inputPath, true);
        }
        this.createInputFile(inputPath, 0, 10);
        return inputPath;
    }

    @Test
    public void testMapCleanup() throws Exception {
        TestMapperReducerCleanup.reset();
        Job job = Job.getInstance();
        Path inputPath = this.createInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(FailingMapper.class);
        job.setInputFormatClass(TrackingTextInputFormat.class);
        job.setOutputFormatClass(TrackingTextOutputFormat.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        job.waitForCompletion(true);
        Assertions.assertTrue((boolean)mapCleanup);
        Assertions.assertTrue((boolean)recordReaderCleanup);
        Assertions.assertTrue((boolean)recordWriterCleanup);
    }

    @Test
    public void testReduceCleanup() throws Exception {
        TestMapperReducerCleanup.reset();
        Job job = Job.getInstance();
        Path inputPath = this.createInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(TrackingTokenizerMapper.class);
        job.setReducerClass(FailingReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TrackingTextInputFormat.class);
        job.setOutputFormatClass(TrackingTextOutputFormat.class);
        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        job.waitForCompletion(true);
        Assertions.assertTrue((boolean)mapCleanup);
        Assertions.assertTrue((boolean)reduceCleanup);
        Assertions.assertTrue((boolean)recordReaderCleanup);
        Assertions.assertTrue((boolean)recordWriterCleanup);
    }

    @Test
    public void testJobSuccessCleanup() throws Exception {
        TestMapperReducerCleanup.reset();
        Job job = Job.getInstance();
        Path inputPath = this.createInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(TrackingTokenizerMapper.class);
        job.setReducerClass(TrackingIntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TrackingTextInputFormat.class);
        job.setOutputFormatClass(TrackingTextOutputFormat.class);
        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        job.waitForCompletion(true);
        Assertions.assertTrue((boolean)mapCleanup);
        Assertions.assertTrue((boolean)reduceCleanup);
        Assertions.assertTrue((boolean)recordReaderCleanup);
        Assertions.assertTrue((boolean)recordWriterCleanup);
        Assertions.assertNotNull((Object)job.getCluster());
        job.close();
        Assertions.assertNull((Object)job.getCluster());
    }

    private static class FailingMapper
    extends Mapper<LongWritable, Text, LongWritable, Text> {
        private FailingMapper() {
        }

        public void map(LongWritable key, Text val, Mapper.Context c) throws IOException, InterruptedException {
            throw new IOException("TestMapperReducerCleanup");
        }

        protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
            mapCleanup = true;
            super.cleanup(context);
        }
    }

    public static class TrackingTextInputFormat
    extends TextInputFormat {
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
            return new TrackingRecordReader();
        }

        public static class TrackingRecordReader
        extends LineRecordReader {
            public synchronized void close() throws IOException {
                recordReaderCleanup = true;
                super.close();
            }
        }
    }

    public static class TrackingTextOutputFormat
    extends TextOutputFormat {
        public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            Configuration conf = job.getConfiguration();
            Path file = this.getDefaultWorkFile(job, "");
            FileSystem fs = file.getFileSystem(conf);
            FSDataOutputStream fileOut = fs.create(file, false);
            return new TrackingRecordWriter((DataOutputStream)fileOut);
        }

        public static class TrackingRecordWriter
        extends TextOutputFormat.LineRecordWriter {
            public TrackingRecordWriter(DataOutputStream out) {
                super(out);
            }

            public synchronized void close(TaskAttemptContext context) throws IOException {
                recordWriterCleanup = true;
                super.close(context);
            }
        }
    }

    private static class TrackingTokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        private TrackingTokenizerMapper() {
        }

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write((Object)this.word, (Object)one);
            }
        }

        protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
            mapCleanup = true;
            super.cleanup(context);
        }
    }

    private static class FailingReducer
    extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
        private FailingReducer() {
        }

        public void reduce(LongWritable key, Iterable<Text> vals, Reducer.Context context) throws IOException, InterruptedException {
            throw new IOException("TestMapperReducerCleanup");
        }

        protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
            reduceCleanup = true;
            super.cleanup(context);
        }
    }

    private static class TrackingIntSumReducer
    extends IntSumReducer {
        private TrackingIntSumReducer() {
        }

        protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
            reduceCleanup = true;
            super.cleanup(context);
        }
    }
}

