package org.apache.hadoop.tools.util;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:hadoop-tools-dist-2.10.1-ODI/share/hadoop/tools/lib/hadoop-distcp-2.10.1-ODI.jar:org/apache/hadoop/tools/util/ProducerConsumer.class */
public class ProducerConsumer<T, R> {
    private ExecutorService executor;
    private Log LOG = LogFactory.getLog(ProducerConsumer.class);
    private LinkedBlockingQueue<WorkRequest<T>> inputQueue = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<WorkReport<R>> outputQueue = new LinkedBlockingQueue<>();
    private AtomicInteger workCnt = new AtomicInteger(0);

    /* loaded from: input_file:hadoop-tools-dist-2.10.1-ODI/share/hadoop/tools/lib/hadoop-distcp-2.10.1-ODI.jar:org/apache/hadoop/tools/util/ProducerConsumer$Worker.class */
    private class Worker implements Runnable {
        private WorkRequestProcessor<T, R> processor;

        public Worker(WorkRequestProcessor<T, R> workRequestProcessor) {
            this.processor = workRequestProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    WorkRequest<T> workRequest = (WorkRequest) ProducerConsumer.this.inputQueue.take();
                    boolean z = false;
                    while (!z) {
                        try {
                            ProducerConsumer.this.outputQueue.put(this.processor.processItem(workRequest));
                            z = true;
                        } catch (InterruptedException e) {
                            ProducerConsumer.this.LOG.debug("Worker thread was interrupted while processing an item, or putting into outputQueue. Retrying...");
                        }
                    }
                } catch (InterruptedException e2) {
                    ProducerConsumer.this.LOG.debug("Interrupted while waiting for requests from inputQueue.");
                    return;
                }
            }
        }
    }

    public ProducerConsumer(int i) {
        this.executor = Executors.newFixedThreadPool(i);
    }

    public void addWorker(WorkRequestProcessor<T, R> workRequestProcessor) {
        this.executor.execute(new Worker(workRequestProcessor));
    }

    public void shutdown() {
        if (hasWork()) {
            this.LOG.warn("Shutdown() is called but there are still unprocessed work!");
        }
        this.executor.shutdownNow();
    }

    public int getWorkCnt() {
        return this.workCnt.get();
    }

    public boolean hasWork() {
        return this.workCnt.get() > 0;
    }

    public void put(WorkRequest<T> workRequest) {
        boolean z = false;
        while (!z) {
            try {
                this.inputQueue.put(workRequest);
                this.workCnt.incrementAndGet();
                z = true;
            } catch (InterruptedException e) {
                this.LOG.error("Could not put workRequest into inputQueue. Retrying...");
            }
        }
    }

    public WorkReport<R> take() throws InterruptedException {
        WorkReport<R> take = this.outputQueue.take();
        this.workCnt.decrementAndGet();
        return take;
    }

    public WorkReport<R> blockingTake() {
        while (true) {
            try {
                WorkReport<R> take = this.outputQueue.take();
                this.workCnt.decrementAndGet();
                return take;
            } catch (InterruptedException e) {
                this.LOG.debug("Retrying in blockingTake...");
            }
        }
    }
}
