/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.lib;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultithreadedMapRunner<K1, V1, K2, V2>
implements MapRunnable<K1, V1, K2, V2> {
    private static final Logger LOG = LoggerFactory.getLogger((String)MultithreadedMapRunner.class.getName());
    private JobConf job;
    private Mapper<K1, V1, K2, V2> mapper;
    private ExecutorService executorService;
    private volatile IOException ioException;
    private volatile RuntimeException runtimeException;
    private boolean incrProcCount;

    @Override
    public void configure(JobConf jobConf) {
        int numberOfThreads = jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Configuring jobConf " + jobConf.getJobName() + " to use " + numberOfThreads + " threads");
        }
        this.job = jobConf;
        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(this.job) > 0L && SkipBadRecords.getAutoIncrMapperProcCount(this.job);
        this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
        this.executorService = new HadoopThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue(numberOfThreads));
    }

    private void checkForExceptionsFromProcessingThreads() throws IOException, RuntimeException {
        if (this.ioException != null) {
            throw this.ioException;
        }
        if (this.runtimeException != null) {
            throw this.runtimeException;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException {
        try {
            K1 key = input.createKey();
            V1 value = input.createValue();
            while (input.next(key, value)) {
                this.executorService.execute(new MapperInvokeRunable(key, value, output, reporter));
                this.checkForExceptionsFromProcessingThreads();
                key = input.createKey();
                value = input.createValue();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finished dispatching all Mappper.map calls, job " + this.job.getJobName());
            }
            this.executorService.shutdown();
            try {
                while (!this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Awaiting all running Mappper.map calls to finish, job " + this.job.getJobName());
                    }
                    this.checkForExceptionsFromProcessingThreads();
                }
                this.checkForExceptionsFromProcessingThreads();
            }
            catch (IOException ioEx) {
                this.executorService.shutdownNow();
                throw ioEx;
            }
            catch (InterruptedException iEx) {
                throw new RuntimeException(iEx);
            }
        }
        finally {
            this.mapper.close();
        }
    }

    private static class BlockingArrayQueue
    extends ArrayBlockingQueue<Runnable> {
        private static final long serialVersionUID = 1L;

        public BlockingArrayQueue(int capacity) {
            super(capacity);
        }

        @Override
        public boolean offer(Runnable r) {
            return this.add(r);
        }

        @Override
        public boolean add(Runnable r) {
            try {
                this.put(r);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            return true;
        }
    }

    private class MapperInvokeRunable
    implements Runnable {
        private K1 key;
        private V1 value;
        private OutputCollector<K2, V2> output;
        private Reporter reporter;

        public MapperInvokeRunable(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) {
            this.key = key;
            this.value = value;
            this.output = output;
            this.reporter = reporter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                MultithreadedMapRunner.this.mapper.map(this.key, this.value, this.output, this.reporter);
                if (MultithreadedMapRunner.this.incrProcCount) {
                    this.reporter.incrCounter("SkippingTaskCounters", "MapProcessedRecords", 1L);
                }
            }
            catch (IOException ex) {
                MultithreadedMapRunner multithreadedMapRunner = MultithreadedMapRunner.this;
                synchronized (multithreadedMapRunner) {
                    if (MultithreadedMapRunner.this.ioException == null) {
                        MultithreadedMapRunner.this.ioException = ex;
                    }
                }
            }
            catch (RuntimeException ex) {
                MultithreadedMapRunner multithreadedMapRunner = MultithreadedMapRunner.this;
                synchronized (multithreadedMapRunner) {
                    if (MultithreadedMapRunner.this.runtimeException == null) {
                        MultithreadedMapRunner.this.runtimeException = ex;
                    }
                }
            }
        }
    }
}

