/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckableNameNodeResource;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeResourcePolicy;
import org.apache.hadoop.hdfs.server.namenode.RedundantEditLogInputStream;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class JournalSet
implements JournalManager {
    static final Logger LOG = LoggerFactory.getLogger(FSEditLog.class);
    private static final Comparator<EditLogInputStream> LOCAL_LOG_PREFERENCE_COMPARATOR = Comparator.comparing(EditLogInputStream::isLocalLog).reversed();
    public static final Comparator<EditLogInputStream> EDIT_LOG_INPUT_STREAM_COMPARATOR = Comparator.comparing(EditLogInputStream::getFirstTxId).thenComparing(EditLogInputStream::getLastTxId);
    private final List<JournalAndStream> journals = new CopyOnWriteArrayList<JournalAndStream>();
    final int minimumRedundantJournals;
    private boolean closed;
    private long lastJournalledTxId;

    JournalSet(int minimumRedundantResources) {
        this.minimumRedundantJournals = minimumRedundantResources;
        this.lastJournalledTxId = -12345L;
    }

    @Override
    public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean hasSomeData() throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public EditLogOutputStream startLogSegment(final long txId, final int layoutVersion) throws IOException {
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.startLogSegment(txId, layoutVersion);
            }
        }, "starting log segment " + txId);
        return new JournalSetOutputStream();
    }

    @Override
    public void finalizeLogSegment(final long firstTxId, final long lastTxId) throws IOException {
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                if (jas.isActive()) {
                    jas.closeStream();
                    jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
                }
            }
        }, "finalize log segment " + firstTxId + ", " + lastTxId);
    }

    @Override
    public void close() throws IOException {
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.close();
            }
        }, "close journal");
        this.closed = true;
    }

    public boolean isOpen() {
        return !this.closed;
    }

    @Override
    public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
        PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, EDIT_LOG_INPUT_STREAM_COMPARATOR);
        for (JournalAndStream jas : this.journals) {
            if (jas.isDisabled()) {
                LOG.info("Skipping jas " + jas + " since it's disabled");
                continue;
            }
            try {
                jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk, onlyDurableTxns);
            }
            catch (IOException ioe) {
                LOG.warn("Unable to determine input streams from " + jas.getManager() + ". Skipping.", (Throwable)ioe);
            }
        }
        JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
    }

    public static void chainAndMakeRedundantStreams(Collection<EditLogInputStream> outStreams, PriorityQueue<EditLogInputStream> allStreams, long fromTxId) {
        EditLogInputStream elis;
        LinkedList<EditLogInputStream> acc = new LinkedList<EditLogInputStream>();
        while ((elis = allStreams.poll()) != null) {
            if (acc.isEmpty()) {
                acc.add(elis);
                continue;
            }
            EditLogInputStream accFirst = (EditLogInputStream)acc.get(0);
            long accFirstTxId = accFirst.getFirstTxId();
            if (accFirstTxId == elis.getFirstTxId()) {
                if (elis.isInProgress()) {
                    if (!accFirst.isInProgress()) continue;
                    acc.add(elis);
                    continue;
                }
                if (accFirst.isInProgress()) {
                    acc.clear();
                }
                acc.add(elis);
                continue;
            }
            if (accFirstTxId < elis.getFirstTxId()) {
                Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
                outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
                acc.clear();
                acc.add(elis);
                continue;
            }
            if (accFirstTxId <= elis.getFirstTxId()) continue;
            throw new RuntimeException("sorted set invariants violated!  Got stream with first txid " + elis.getFirstTxId() + ", but the last firstTxId was " + accFirstTxId);
        }
        if (!acc.isEmpty()) {
            Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
            outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
            acc.clear();
        }
    }

    public boolean isEmpty() {
        return !NameNodeResourcePolicy.areResourcesAvailable(this.journals, this.minimumRedundantJournals);
    }

    private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
        if (badJournals == null || badJournals.isEmpty()) {
            return;
        }
        for (JournalAndStream j : badJournals) {
            LOG.error("Disabling journal " + j);
            j.abort();
            j.setDisabled(true);
        }
    }

    private void mapJournalsAndReportErrors(JournalClosure closure, String status) throws IOException {
        LinkedList badJAS = Lists.newLinkedList();
        for (JournalAndStream jas : this.journals) {
            try {
                closure.apply(jas);
            }
            catch (Throwable t) {
                if (jas.isRequired()) {
                    String msg = "Error: " + status + " failed for required journal (" + jas + ")";
                    LOG.error(msg, t);
                    this.abortAllJournals();
                    ExitUtil.terminate((int)1, (String)msg);
                    continue;
                }
                LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
                badJAS.add(jas);
            }
        }
        this.disableAndReportErrorOnJournals(badJAS);
        if (!NameNodeResourcePolicy.areResourcesAvailable(this.journals, this.minimumRedundantJournals)) {
            String message = status + " failed for too many journals";
            LOG.error("Error: " + message);
            throw new IOException(message);
        }
    }

    private void abortAllJournals() {
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive()) continue;
            jas.abort();
        }
    }

    @Override
    public void setOutputBufferCapacity(final int size) {
        try {
            this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    jas.getManager().setOutputBufferCapacity(size);
                }
            }, "setOutputBufferCapacity");
        }
        catch (IOException e) {
            LOG.error("Error in setting outputbuffer capacity");
        }
    }

    List<JournalAndStream> getAllJournalStreams() {
        return this.journals;
    }

    List<JournalManager> getJournalManagers() {
        ArrayList<JournalManager> jList = new ArrayList<JournalManager>();
        for (JournalAndStream j : this.journals) {
            jList.add(j.getManager());
        }
        return jList;
    }

    void add(JournalManager j, boolean required) {
        this.add(j, required, false);
    }

    void add(JournalManager j, boolean required, boolean shared) {
        JournalAndStream jas = new JournalAndStream(j, required, shared);
        this.journals.add(jas);
    }

    void remove(JournalManager j) {
        JournalAndStream jasToRemove = null;
        for (JournalAndStream jas : this.journals) {
            if (!jas.getManager().equals(j)) continue;
            jasToRemove = jas;
            break;
        }
        if (jasToRemove != null) {
            jasToRemove.abort();
            this.journals.remove(jasToRemove);
        }
    }

    @Override
    public void purgeLogsOlderThan(final long minTxIdToKeep) throws IOException {
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.getManager().purgeLogsOlderThan(minTxIdToKeep);
            }
        }, "purgeLogsOlderThan " + minTxIdToKeep);
    }

    @Override
    public void recoverUnfinalizedSegments() throws IOException {
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.getManager().recoverUnfinalizedSegments();
            }
        }, "recoverUnfinalizedSegments");
    }

    public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
        ArrayList<RemoteEditLog> allLogs = new ArrayList<RemoteEditLog>();
        for (JournalAndStream j : this.journals) {
            if (!(j.getManager() instanceof FileJournalManager)) continue;
            FileJournalManager fjm = (FileJournalManager)j.getManager();
            try {
                allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, false));
            }
            catch (Throwable t) {
                LOG.warn("Cannot list edit logs in " + fjm, t);
            }
        }
        HashMap logsByStartTxId = new HashMap();
        allLogs.forEach(input -> {
            long key = RemoteEditLog.GET_START_TXID.apply((RemoteEditLog)input);
            logsByStartTxId.computeIfAbsent(key, k -> new ArrayList()).add(input);
        });
        long curStartTxId = fromTxId;
        ArrayList<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
        while (true) {
            List logGroup;
            if ((logGroup = logsByStartTxId.getOrDefault(curStartTxId, Collections.emptyList())).isEmpty()) {
                SortedSet<Object> startTxIds = new TreeSet(logsByStartTxId.keySet());
                if ((startTxIds = startTxIds.tailSet(curStartTxId)).isEmpty()) break;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Found gap in logs at " + curStartTxId + ": not returning previous logs in manifest.");
                }
                logs.clear();
                curStartTxId = (Long)startTxIds.first();
                continue;
            }
            RemoteEditLog bestLog = (RemoteEditLog)Collections.max(logGroup);
            logs.add(bestLog);
            curStartTxId = bestLog.getEndTxId() + 1L;
        }
        RemoteEditLogManifest ret = new RemoteEditLogManifest(logs, curStartTxId - 1L);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Generated manifest for logs since " + fromTxId + ":" + ret);
        }
        return ret;
    }

    String getSyncTimes() {
        StringBuilder buf = new StringBuilder();
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive()) continue;
            buf.append(jas.getCurrentStream().getTotalSyncTime()).append(" ");
        }
        return buf.toString();
    }

    @Override
    public void doPreUpgrade() throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void doUpgrade(Storage storage) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void doFinalize() throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void doRollback() throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void discardSegments(long startTxId) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public long getJournalCTime() throws IOException {
        throw new UnsupportedOperationException();
    }

    private static interface JournalClosure {
        public void apply(JournalAndStream var1) throws IOException;
    }

    private class JournalSetOutputStream
    extends EditLogOutputStream {
        JournalSetOutputStream() throws IOException {
        }

        @Override
        public long getLastJournalledTxId() {
            return JournalSet.this.lastJournalledTxId;
        }

        @Override
        public void write(final FSEditLogOp op) throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (jas.isActive()) {
                        jas.getCurrentStream().write(op);
                    }
                }
            }, "write op");
            assert (JournalSet.this.lastJournalledTxId < op.txid) : "TxId order violation for op=" + op + ", lastJournalledTxId=" + JournalSet.this.lastJournalledTxId;
            JournalSet.this.lastJournalledTxId = op.txid;
        }

        @Override
        public void writeRaw(final byte[] data, final int offset, final int length) throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (jas.isActive()) {
                        jas.getCurrentStream().writeRaw(data, offset, length);
                    }
                }
            }, "write bytes");
        }

        @Override
        public void create(final int layoutVersion) throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (jas.isActive()) {
                        jas.getCurrentStream().create(layoutVersion);
                    }
                }
            }, "create");
        }

        @Override
        public void close() throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    jas.closeStream();
                }
            }, "close");
        }

        @Override
        public void abort() throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    jas.abort();
                }
            }, "abort");
        }

        @Override
        public void setReadyToFlush() throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (jas.isActive()) {
                        jas.getCurrentStream().setReadyToFlush();
                    }
                }
            }, "setReadyToFlush");
        }

        @Override
        protected void flushAndSync(final boolean durable) throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (jas.isActive()) {
                        jas.getCurrentStream().flushAndSync(durable);
                    }
                }
            }, "flushAndSync");
        }

        @Override
        public void flush() throws IOException {
            JournalSet.this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (jas.isActive()) {
                        jas.getCurrentStream().flush();
                    }
                }
            }, "flush");
        }

        @Override
        public boolean shouldForceSync() {
            for (JournalAndStream js : JournalSet.this.journals) {
                if (!js.isActive() || !js.getCurrentStream().shouldForceSync()) continue;
                return true;
            }
            return false;
        }

        @Override
        protected long getNumSync() {
            for (JournalAndStream jas : JournalSet.this.journals) {
                if (!jas.isActive()) continue;
                return jas.getCurrentStream().getNumSync();
            }
            return 0L;
        }
    }

    static class JournalAndStream
    implements CheckableNameNodeResource {
        private JournalManager journal;
        private boolean disabled = false;
        private EditLogOutputStream stream;
        private final boolean required;
        private final boolean shared;

        public JournalAndStream(JournalManager manager, boolean required, boolean shared) {
            this.journal = manager;
            this.required = required;
            this.shared = shared;
        }

        public void startLogSegment(long txId, int layoutVersion) throws IOException {
            Preconditions.checkState((this.stream == null ? 1 : 0) != 0);
            this.disabled = false;
            this.stream = this.journal.startLogSegment(txId, layoutVersion);
        }

        public void closeStream() throws IOException {
            if (this.stream == null) {
                return;
            }
            this.stream.close();
            this.stream = null;
        }

        public void close() throws IOException {
            this.closeStream();
            this.journal.close();
        }

        public void abort() {
            if (this.stream == null) {
                return;
            }
            try {
                this.stream.abort();
            }
            catch (IOException ioe) {
                LOG.error("Unable to abort stream " + this.stream, (Throwable)ioe);
            }
            this.stream = null;
        }

        boolean isActive() {
            return this.stream != null;
        }

        EditLogOutputStream getCurrentStream() {
            return this.stream;
        }

        public String toString() {
            return "JournalAndStream(mgr=" + this.journal + ", stream=" + this.stream + ")";
        }

        void setCurrentStreamForTests(EditLogOutputStream stream) {
            this.stream = stream;
        }

        @VisibleForTesting
        void setJournalForTests(JournalManager jm) {
            this.journal = jm;
        }

        JournalManager getManager() {
            return this.journal;
        }

        boolean isDisabled() {
            return this.disabled;
        }

        private void setDisabled(boolean disabled) {
            this.disabled = disabled;
        }

        @Override
        public boolean isResourceAvailable() {
            return !this.isDisabled();
        }

        @Override
        public boolean isRequired() {
            return this.required;
        }

        public boolean isShared() {
            return this.shared;
        }
    }
}

