/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.chain;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapContextImpl;
import org.apache.hadoop.mapreduce.lib.chain.ChainReduceContextImpl;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Chain {
    protected static final String CHAIN_MAPPER = "mapreduce.chain.mapper";
    protected static final String CHAIN_REDUCER = "mapreduce.chain.reducer";
    protected static final String CHAIN_MAPPER_SIZE = ".size";
    protected static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
    protected static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
    protected static final String CHAIN_REDUCER_CLASS = ".reducer.class";
    protected static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
    protected static final String MAPPER_INPUT_KEY_CLASS = "mapreduce.chain.mapper.input.key.class";
    protected static final String MAPPER_INPUT_VALUE_CLASS = "mapreduce.chain.mapper.input.value.class";
    protected static final String MAPPER_OUTPUT_KEY_CLASS = "mapreduce.chain.mapper.output.key.class";
    protected static final String MAPPER_OUTPUT_VALUE_CLASS = "mapreduce.chain.mapper.output.value.class";
    protected static final String REDUCER_INPUT_KEY_CLASS = "mapreduce.chain.reducer.input.key.class";
    protected static final String REDUCER_INPUT_VALUE_CLASS = "mapreduce.chain.reducer.input.value.class";
    protected static final String REDUCER_OUTPUT_KEY_CLASS = "mapreduce.chain.reducer.output.key.class";
    protected static final String REDUCER_OUTPUT_VALUE_CLASS = "mapreduce.chain.reducer.output.value.class";
    protected boolean isMap;
    private List<Mapper> mappers = new ArrayList<Mapper>();
    private Reducer<?, ?, ?, ?> reducer;
    private List<Configuration> confList = new ArrayList<Configuration>();
    private Configuration rConf;
    private List<Thread> threads = new ArrayList<Thread>();
    private List<ChainBlockingQueue<?>> blockingQueues = new ArrayList();
    private Throwable throwable = null;

    protected Chain(boolean isMap) {
        this.isMap = isMap;
    }

    private synchronized Throwable getThrowable() {
        return this.throwable;
    }

    private synchronized boolean setIfUnsetThrowable(Throwable th) {
        if (this.throwable == null) {
            this.throwable = th;
            return true;
        }
        return false;
    }

    Configuration getConf(int index) {
        return this.confList.get(index);
    }

    private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> Mapper.Context createMapContext(RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw, TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context, Configuration conf) {
        ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext = new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(context, rr, rw, conf);
        Mapper.Context mapperContext = new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>().getMapContext(mapContext);
        return mapperContext;
    }

    void runMapper(TaskInputOutputContext context, int index) throws IOException, InterruptedException {
        Mapper mapper = this.mappers.get(index);
        ChainRecordReader rr = new ChainRecordReader(context);
        ChainRecordWriter rw = new ChainRecordWriter(context);
        Mapper.Context mapperContext = this.createMapContext(rr, rw, context, this.getConf(index));
        mapper.run(mapperContext);
        ((RecordReader)rr).close();
        ((RecordWriter)rw).close(context);
    }

    void addMapper(TaskInputOutputContext inputContext, ChainBlockingQueue<KeyValuePair<?, ?>> output, int index) throws IOException, InterruptedException {
        Configuration conf = this.getConf(index);
        Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
        Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class);
        ChainRecordReader rr = new ChainRecordReader(inputContext);
        ChainRecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf);
        Mapper.Context mapperContext = this.createMapContext(rr, rw, (MapContext)inputContext, this.getConf(index));
        MapRunner runner = new MapRunner(this.mappers.get(index), mapperContext, rr, rw);
        this.threads.add(runner);
    }

    void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input, TaskInputOutputContext outputContext, int index) throws IOException, InterruptedException {
        Configuration conf = this.getConf(index);
        Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
        Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
        ChainRecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
        ChainRecordWriter rw = new ChainRecordWriter(outputContext);
        MapRunner runner = new MapRunner(this.mappers.get(index), this.createMapContext(rr, rw, outputContext, this.getConf(index)), rr, rw);
        this.threads.add(runner);
    }

    void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input, ChainBlockingQueue<KeyValuePair<?, ?>> output, TaskInputOutputContext context, int index) throws IOException, InterruptedException {
        Configuration conf = this.getConf(index);
        Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
        Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
        Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
        Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class);
        ChainRecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
        ChainRecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf);
        MapRunner runner = new MapRunner(this.mappers.get(index), this.createMapContext(rr, rw, context, this.getConf(index)), rr, rw);
        this.threads.add(runner);
    }

    private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> Reducer.Context createReduceContext(RecordWriter<KEYOUT, VALUEOUT> rw, ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context, Configuration conf) {
        ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext = new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(context, rw, conf);
        Reducer.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>().getReducerContext(reduceContext);
        return reducerContext;
    }

    <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context) throws IOException, InterruptedException {
        ChainRecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(context);
        Reducer.Context reducerContext = this.createReduceContext(rw, (ReduceContext)context, this.rConf);
        this.reducer.run(reducerContext);
        ((RecordWriter)rw).close(context);
    }

    void addReducer(TaskInputOutputContext inputContext, ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException, InterruptedException {
        Class<?> keyOutClass = this.rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, Object.class);
        Class<?> valueOutClass = this.rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, Object.class);
        ChainRecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, outputQueue, this.rConf);
        Reducer.Context reducerContext = this.createReduceContext(rw, (ReduceContext)inputContext, this.rConf);
        ReduceRunner runner = new ReduceRunner(reducerContext, this.reducer, rw);
        this.threads.add(runner);
    }

    void startAllThreads() {
        for (Thread thread : this.threads) {
            thread.start();
        }
    }

    void joinAllThreads() throws IOException, InterruptedException {
        for (Thread thread : this.threads) {
            thread.join();
        }
        Throwable th = this.getThrowable();
        if (th != null) {
            if (th instanceof IOException) {
                throw (IOException)th;
            }
            if (th instanceof InterruptedException) {
                throw (InterruptedException)th;
            }
            throw new RuntimeException(th);
        }
    }

    private synchronized void interruptAllThreads() {
        for (Thread thread : this.threads) {
            thread.interrupt();
        }
        for (ChainBlockingQueue chainBlockingQueue : this.blockingQueues) {
            chainBlockingQueue.interrupt();
        }
    }

    protected static String getPrefix(boolean isMap) {
        return isMap ? CHAIN_MAPPER : CHAIN_REDUCER;
    }

    protected static int getIndex(Configuration conf, String prefix) {
        return conf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
    }

    protected static Configuration getChainElementConf(Configuration jobConf, String confKey) {
        Configuration conf = null;
        try (DefaultStringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class);){
            String confString = jobConf.get(confKey, null);
            if (confString != null) {
                conf = (Configuration)stringifier.fromString(jobConf.get(confKey, null));
            }
        }
        catch (IOException ioex) {
            throw new RuntimeException(ioex);
        }
        jobConf = new Configuration(jobConf);
        if (conf != null) {
            for (Map.Entry<String, String> entry : conf) {
                jobConf.set(entry.getKey(), entry.getValue());
            }
        }
        return jobConf;
    }

    protected static void addMapper(boolean isMap, Job job, Class<? extends Mapper> klass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration mapperConf) {
        String prefix = Chain.getPrefix(isMap);
        Configuration jobConf = job.getConfiguration();
        Chain.checkReducerAlreadySet(isMap, jobConf, prefix, true);
        int index = Chain.getIndex(jobConf, prefix);
        jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
        Chain.validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, index, prefix);
        Chain.setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, mapperConf, index, prefix);
    }

    protected static void checkReducerAlreadySet(boolean isMap, Configuration jobConf, String prefix, boolean shouldSet) {
        if (!isMap) {
            if (shouldSet) {
                if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) == null) {
                    throw new IllegalStateException("A Mapper can be added to the chain only after the Reducer has been set");
                }
            } else if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
                throw new IllegalStateException("Reducer has been already set");
            }
        }
    }

    protected static void validateKeyValueTypes(boolean isMap, Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, int index, String prefix) {
        if (!isMap && index == 0) {
            Configuration reducerConf = Chain.getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
            if (!inputKeyClass.isAssignableFrom(reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
                throw new IllegalArgumentException("The Reducer output key class does not match the Mapper input key class");
            }
            if (!inputValueClass.isAssignableFrom(reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
                throw new IllegalArgumentException("The Reducer output value class does not match the Mapper input value class");
            }
        } else if (index > 0) {
            Configuration previousMapperConf = Chain.getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + (index - 1));
            if (!inputKeyClass.isAssignableFrom(previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
                throw new IllegalArgumentException("The specified Mapper input key class does not match the previous Mapper's output key class.");
            }
            if (!inputValueClass.isAssignableFrom(previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
                throw new IllegalArgumentException("The specified Mapper input value class does not match the previous Mapper's output value class.");
            }
        }
    }

    protected static void setMapperConf(boolean isMap, Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration mapperConf, int index, String prefix) {
        if (mapperConf == null) {
            mapperConf = new Configuration(true);
        }
        mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
        mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, Object.class);
        mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
        mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class);
        DefaultStringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class);
        try {
            jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, stringifier.toString(new Configuration(mapperConf)));
        }
        catch (IOException ioEx) {
            throw new RuntimeException(ioEx);
        }
        jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
    }

    protected static void setReducer(Job job, Class<? extends Reducer> klass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration reducerConf) {
        String prefix = Chain.getPrefix(false);
        Configuration jobConf = job.getConfiguration();
        Chain.checkReducerAlreadySet(false, jobConf, prefix, false);
        jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
        Chain.setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, reducerConf, prefix);
    }

    protected static void setReducerConf(Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration reducerConf, String prefix) {
        if (reducerConf == null) {
            reducerConf = new Configuration(false);
        }
        reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
        reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass, Object.class);
        reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
        reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class);
        DefaultStringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class);
        try {
            jobConf.set(prefix + CHAIN_REDUCER_CONFIG, stringifier.toString(new Configuration(reducerConf)));
        }
        catch (IOException ioEx) {
            throw new RuntimeException(ioEx);
        }
    }

    void setup(Configuration jobConf) {
        String prefix = Chain.getPrefix(this.isMap);
        int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
        for (int i = 0; i < index; ++i) {
            Class<Mapper> klass = jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
            Configuration mConf = Chain.getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
            this.confList.add(mConf);
            Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
            this.mappers.add(mapper);
        }
        Class<Reducer> klass = jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
        if (klass != null) {
            this.rConf = Chain.getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
            this.reducer = ReflectionUtils.newInstance(klass, this.rConf);
        }
    }

    List<Mapper> getAllMappers() {
        return this.mappers;
    }

    Reducer<?, ?, ?, ?> getReducer() {
        return this.reducer;
    }

    ChainBlockingQueue<KeyValuePair<?, ?>> createBlockingQueue() {
        return new ChainBlockingQueue();
    }

    private static class ChainRecordReader<KEYIN, VALUEIN>
    extends RecordReader<KEYIN, VALUEIN> {
        private Class<?> keyClass;
        private Class<?> valueClass;
        private KEYIN key;
        private VALUEIN value;
        private Configuration conf;
        TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> inputContext = null;
        ChainBlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue = null;

        ChainRecordReader(Class<?> keyClass, Class<?> valueClass, ChainBlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue, Configuration conf) {
            this.keyClass = keyClass;
            this.valueClass = valueClass;
            this.inputQueue = inputQueue;
            this.conf = conf;
        }

        ChainRecordReader(TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> context) {
            this.inputContext = context;
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (this.inputQueue != null) {
                return this.readFromQueue();
            }
            if (this.inputContext.nextKeyValue()) {
                this.key = this.inputContext.getCurrentKey();
                this.value = this.inputContext.getCurrentValue();
                return true;
            }
            return false;
        }

        private boolean readFromQueue() throws IOException, InterruptedException {
            KeyValuePair<KEYIN, VALUEIN> kv = null;
            kv = this.inputQueue.dequeue();
            if (kv.endOfInput) {
                return false;
            }
            this.key = ReflectionUtils.newInstance(this.keyClass, this.conf);
            this.value = ReflectionUtils.newInstance(this.valueClass, this.conf);
            ReflectionUtils.copy(this.conf, kv.key, this.key);
            ReflectionUtils.copy(this.conf, kv.value, this.value);
            return true;
        }

        @Override
        public KEYIN getCurrentKey() throws IOException, InterruptedException {
            return this.key;
        }

        @Override
        public VALUEIN getCurrentValue() throws IOException, InterruptedException {
            return this.value;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0.0f;
        }
    }

    private static class ChainRecordWriter<KEYOUT, VALUEOUT>
    extends RecordWriter<KEYOUT, VALUEOUT> {
        TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> outputContext = null;
        ChainBlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> outputQueue = null;
        KEYOUT keyout;
        VALUEOUT valueout;
        Configuration conf;
        Class<?> keyClass;
        Class<?> valueClass;

        ChainRecordWriter(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
            this.outputContext = context;
        }

        ChainRecordWriter(Class<?> keyClass, Class<?> valueClass, ChainBlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> output, Configuration conf) {
            this.keyClass = keyClass;
            this.valueClass = valueClass;
            this.outputQueue = output;
            this.conf = conf;
        }

        @Override
        public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
            if (this.outputQueue != null) {
                this.writeToQueue(key, value);
            } else {
                this.outputContext.write(key, value);
            }
        }

        private void writeToQueue(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
            this.keyout = ReflectionUtils.newInstance(this.keyClass, this.conf);
            this.valueout = ReflectionUtils.newInstance(this.valueClass, this.conf);
            ReflectionUtils.copy(this.conf, key, this.keyout);
            ReflectionUtils.copy(this.conf, value, this.valueout);
            this.outputQueue.enqueue(new KeyValuePair<KEYOUT, VALUEOUT>(this.keyout, this.valueout));
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (this.outputQueue != null) {
                this.outputQueue.enqueue(new KeyValuePair(true));
            }
        }
    }

    class ChainBlockingQueue<E> {
        E element = null;
        boolean isInterrupted = false;

        ChainBlockingQueue() {
            Chain.this.blockingQueues.add(this);
        }

        synchronized void enqueue(E e) throws InterruptedException {
            while (this.element != null) {
                if (this.isInterrupted) {
                    throw new InterruptedException();
                }
                this.wait();
            }
            this.element = e;
            this.notify();
        }

        synchronized E dequeue() throws InterruptedException {
            while (this.element == null) {
                if (this.isInterrupted) {
                    throw new InterruptedException();
                }
                this.wait();
            }
            E e = this.element;
            this.element = null;
            this.notify();
            return e;
        }

        synchronized void interrupt() {
            this.isInterrupted = true;
            this.notifyAll();
        }
    }

    private class MapRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    extends Thread {
        private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
        private Mapper.Context chainContext;
        private RecordReader<KEYIN, VALUEIN> rr;
        private RecordWriter<KEYOUT, VALUEOUT> rw;

        public MapRunner(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper, Mapper.Context mapperContext, RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException, InterruptedException {
            this.mapper = mapper;
            this.rr = rr;
            this.rw = rw;
            this.chainContext = mapperContext;
        }

        @Override
        public void run() {
            block3: {
                if (Chain.this.getThrowable() != null) {
                    return;
                }
                try {
                    this.mapper.run(this.chainContext);
                    this.rr.close();
                    this.rw.close(this.chainContext);
                }
                catch (Throwable th) {
                    if (!Chain.this.setIfUnsetThrowable(th)) break block3;
                    Chain.this.interruptAllThreads();
                }
            }
        }
    }

    private class ReduceRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    extends Thread {
        private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
        private Reducer.Context chainContext;
        private RecordWriter<KEYOUT, VALUEOUT> rw;

        ReduceRunner(Reducer.Context context, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer, RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException, InterruptedException {
            this.reducer = reducer;
            this.chainContext = context;
            this.rw = rw;
        }

        @Override
        public void run() {
            block2: {
                try {
                    this.reducer.run(this.chainContext);
                    this.rw.close(this.chainContext);
                }
                catch (Throwable th) {
                    if (!Chain.this.setIfUnsetThrowable(th)) break block2;
                    Chain.this.interruptAllThreads();
                }
            }
        }
    }

    static class KeyValuePair<K, V> {
        K key;
        V value;
        boolean endOfInput;

        KeyValuePair(K key, V value) {
            this.key = key;
            this.value = value;
            this.endOfInput = false;
        }

        KeyValuePair(boolean eof) {
            this.key = null;
            this.value = null;
            this.endOfInput = eof;
        }
    }
}

