/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.FutureIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntryFileIO {
    private static final Logger LOG = LoggerFactory.getLogger(EntryFileIO.class);
    public static final int WRITER_SHUTDOWN_TIMEOUT_SECONDS = 60;
    public static final int WRITER_QUEUE_PUT_TIMEOUT_MINUTES = 10;
    private final Configuration conf;

    public EntryFileIO(Configuration conf) {
        this.conf = conf;
    }

    public SequenceFile.Writer createWriter(File file) throws IOException {
        return this.createWriter(EntryFileIO.toPath(file));
    }

    public SequenceFile.Writer createWriter(Path path) throws IOException {
        return SequenceFile.createWriter(this.conf, SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(NullWritable.class), SequenceFile.Writer.valueClass(FileEntry.class));
    }

    public SequenceFile.Reader createReader(File file) throws IOException {
        return this.createReader(EntryFileIO.toPath(file));
    }

    public SequenceFile.Reader createReader(Path path) throws IOException {
        return new SequenceFile.Reader(this.conf, SequenceFile.Reader.file(path));
    }

    public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
        return new EntryIterator(reader);
    }

    public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int capacity) {
        EntryWriter ew = new EntryWriter(writer, capacity);
        ew.start();
        return ew;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static int write(SequenceFile.Writer writer, Collection<FileEntry> entries, boolean close) throws IOException {
        try {
            for (FileEntry entry : entries) {
                writer.append(NullWritable.get(), entry);
            }
            writer.flush();
        }
        finally {
            if (close) {
                writer.close();
            }
        }
        return entries.size();
    }

    public static Path toPath(File file) {
        return new Path(file.toURI());
    }

    @VisibleForTesting
    static final class EntryIterator
    implements RemoteIterator<FileEntry>,
    Closeable {
        private final SequenceFile.Reader reader;
        private FileEntry fetched;
        private boolean closed;
        private int count;

        private EntryIterator(SequenceFile.Reader reader) {
            this.reader = Objects.requireNonNull(reader);
        }

        @Override
        public void close() throws IOException {
            if (!this.closed) {
                this.closed = true;
                this.reader.close();
            }
        }

        public String toString() {
            return "EntryIterator{closed=" + this.closed + ", count=" + this.count + ", fetched=" + this.fetched + "}";
        }

        @Override
        public boolean hasNext() throws IOException {
            return this.fetched != null || this.fetchNext();
        }

        private boolean fetchNext() throws IOException {
            FileEntry readBack = new FileEntry();
            if (this.reader.next(NullWritable.get(), readBack)) {
                this.fetched = readBack;
                ++this.count;
                return true;
            }
            this.fetched = null;
            this.close();
            return false;
        }

        @Override
        public FileEntry next() throws IOException {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            FileEntry r = this.fetched;
            this.fetched = null;
            return r;
        }

        public boolean isClosed() {
            return this.closed;
        }

        int getCount() {
            return this.count;
        }
    }

    public static final class EntryWriter
    implements Closeable {
        private final SequenceFile.Writer writer;
        private final BlockingQueue<QueueEntry> queue;
        private final AtomicBoolean stop = new AtomicBoolean(false);
        private final AtomicBoolean active = new AtomicBoolean(false);
        private final int capacity;
        private ExecutorService executor;
        private Future<Integer> future;
        private final AtomicInteger count = new AtomicInteger();
        private final AtomicReference<IOException> failure = new AtomicReference();

        private EntryWriter(SequenceFile.Writer writer, int capacity) {
            Preconditions.checkState(capacity > 0, "invalid queue capacity %s", capacity);
            this.writer = Objects.requireNonNull(writer);
            this.capacity = capacity;
            this.queue = new ArrayBlockingQueue<QueueEntry>(capacity);
        }

        public boolean isActive() {
            return this.active.get();
        }

        public int getCount() {
            return this.count.get();
        }

        public IOException getFailure() {
            return this.failure.get();
        }

        private void start() {
            Preconditions.checkState(this.executor == null, "already started");
            this.active.set(true);
            this.executor = HadoopExecutors.newSingleThreadExecutor();
            this.future = this.executor.submit(this::processor);
            LOG.debug("Started entry writer {}", (Object)this);
        }

        public boolean enqueue(List<FileEntry> entries) {
            if (entries.isEmpty()) {
                LOG.debug("ignoring enqueue of empty list");
                return true;
            }
            if (this.active.get()) {
                try {
                    LOG.debug("Queueing {} entries", (Object)entries.size());
                    boolean enqueued = this.queue.offer(new QueueEntry(Actions.write, entries), 10L, TimeUnit.MINUTES);
                    if (!enqueued) {
                        LOG.warn("Timeout submitting entries to {}", (Object)this);
                    }
                    return enqueued;
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    return false;
                }
            }
            LOG.warn("EntryFile write queue inactive; discarding {} entries submitted to {}", (Object)entries.size(), (Object)this);
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private int processor() {
            Thread.currentThread().setName("EntryIOWriter");
            try {
                block9: while (true) {
                    if (this.stop.get() != false) return this.count.get();
                    queueEntry = this.queue.take();
                    switch (1.$SwitchMap$org$apache$hadoop$mapreduce$lib$output$committer$manifest$impl$EntryFileIO$Actions[queueEntry.action.ordinal()]) {
                        case 1: {
                            EntryFileIO.LOG.debug("Stop processing");
                            this.stop.set(true);
                            continue block9;
                        }
                    }
                    entries = queueEntry.entries;
                    EntryFileIO.LOG.debug("Adding block of {} entries", (Object)entries.size());
                    var3_5 = entries.iterator();
                    while (true) {
                        if (var3_5.hasNext()) ** break;
                        continue block9;
                        entry = var3_5.next();
                        this.append(entry);
                    }
                    break;
                }
            }
            catch (IOException e) {
                EntryFileIO.LOG.debug("Write failure", (Throwable)e);
                this.failure.set(e);
                throw new UncheckedIOException(e);
            }
            catch (InterruptedException e) {
                EntryFileIO.LOG.debug("interrupted", (Throwable)e);
                return this.count.get();
            }
            finally {
                this.stop.set(true);
                this.active.set(false);
                this.queue.clear();
            }
        }

        private void append(FileEntry entry) throws IOException {
            this.writer.append(NullWritable.get(), entry);
            int c = this.count.incrementAndGet();
            LOG.trace("Added entry #{}: {}", (Object)c, (Object)entry);
        }

        @Override
        public void close() throws IOException {
            if (!this.active.getAndSet(false)) {
                return;
            }
            LOG.debug("Shutting down writer; entry lists in queue: {}", (Object)(this.capacity - this.queue.remainingCapacity()));
            try {
                this.queue.put(new QueueEntry(Actions.stop));
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
            try {
                int total = FutureIO.awaitFuture(this.future, 60L, TimeUnit.SECONDS);
                LOG.debug("Processed {} files", (Object)total);
                this.executor.shutdown();
            }
            catch (TimeoutException e) {
                LOG.warn("Timeout waiting for write thread to finish");
                this.executor.shutdownNow();
            }
            finally {
                this.writer.close();
            }
        }

        public void maybeRaiseWriteException() throws IOException {
            IOException f = this.failure.get();
            if (f != null) {
                throw f;
            }
        }

        public String toString() {
            return "EntryWriter{stop=" + this.stop.get() + ", active=" + this.active.get() + ", count=" + this.count.get() + ", queue depth=" + this.queue.size() + ", failure=" + this.failure + "}";
        }
    }

    private static final class QueueEntry {
        private final Actions action;
        private final List<FileEntry> entries;

        private QueueEntry(Actions action, List<FileEntry> entries) {
            this.action = action;
            this.entries = entries;
        }

        private QueueEntry(Actions action) {
            this(action, null);
        }
    }

    private static enum Actions {
        write,
        stop;

    }
}

