package org.apache.hadoop.hdds.scm.storage;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.shaded.com.google.common.base.Preconditions;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.class */
public class StreamCommitWatcher {
    private static final Logger LOG = LoggerFactory.getLogger(StreamCommitWatcher.class);
    private final List<StreamBuffer> bufferList;
    private final XceiverClientSpi xceiverClient;
    private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>> replies = new ConcurrentHashMap();
    private Map<Long, List<StreamBuffer>> commitIndexMap = new ConcurrentSkipListMap();
    private long totalAckDataLength = 0;

    public StreamCommitWatcher(XceiverClientSpi xceiverClientSpi, List<StreamBuffer> list) {
        this.xceiverClient = xceiverClientSpi;
        this.bufferList = list;
    }

    public void updateCommitInfoMap(long j, List<StreamBuffer> list) {
        this.commitIndexMap.computeIfAbsent(Long.valueOf(j), l -> {
            return new LinkedList();
        }).addAll(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getCommitInfoMapSize() {
        return this.commitIndexMap.size();
    }

    public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
        if (this.commitIndexMap.isEmpty()) {
            return null;
        }
        long asLong = this.commitIndexMap.keySet().stream().mapToLong(l -> {
            return l.longValue();
        }).min().getAsLong();
        if (LOG.isDebugEnabled()) {
            LOG.debug("waiting for first index {} to catch up", Long.valueOf(asLong));
        }
        return streamWatchForCommit(asLong);
    }

    public XceiverClientReply streamWatchOnLastIndex() throws IOException {
        if (this.commitIndexMap.isEmpty()) {
            return null;
        }
        long asLong = this.commitIndexMap.keySet().stream().mapToLong(l -> {
            return l.longValue();
        }).max().getAsLong();
        if (LOG.isDebugEnabled()) {
            LOG.debug("waiting for last flush Index {} to catch up", Long.valueOf(asLong));
        }
        return streamWatchForCommit(asLong);
    }

    public XceiverClientReply streamWatchForCommit(long j) throws IOException {
        MemoizedSupplier memoize = JavaUtils.memoize(CompletableFuture::new);
        CompletableFuture<XceiverClientReply> compute = this.replies.compute(Long.valueOf(j), (l, completableFuture) -> {
            return completableFuture != null ? completableFuture : (CompletableFuture) memoize.get();
        });
        if (!memoize.isInitialized()) {
            return compute.join();
        }
        try {
            XceiverClientReply watchForCommit = this.xceiverClient.watchForCommit(j);
            compute.complete(watchForCommit);
            Preconditions.checkState(this.replies.remove(Long.valueOf(j)) == compute);
            adjustBuffers(watchForCommit.getLogIndex());
            return watchForCommit;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw getIOExceptionForWatchForCommit(j, e);
        } catch (ExecutionException | TimeoutException e2) {
            throw getIOExceptionForWatchForCommit(j, e2);
        }
    }

    void releaseBuffersOnException() {
        adjustBuffers(this.xceiverClient.getReplicatedMinCommitIndex());
    }

    private void adjustBuffers(long j) {
        List<Long> list = (List) this.commitIndexMap.keySet().stream().filter(l -> {
            return l.longValue() <= j;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        releaseBuffers(list);
    }

    private long releaseBuffers(List<Long> list) {
        Preconditions.checkArgument(!this.commitIndexMap.isEmpty());
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            Preconditions.checkState(this.commitIndexMap.containsKey(Long.valueOf(longValue)));
            List<StreamBuffer> remove = this.commitIndexMap.remove(Long.valueOf(longValue));
            this.totalAckDataLength += remove.stream().mapToLong((v0) -> {
                return v0.position();
            }).sum();
            Iterator<StreamBuffer> it2 = remove.iterator();
            while (it2.hasNext()) {
                this.bufferList.remove(it2.next());
            }
        }
        return this.totalAckDataLength;
    }

    public long getTotalAckDataLength() {
        return this.totalAckDataLength;
    }

    private IOException getIOExceptionForWatchForCommit(long j, Exception exc) {
        LOG.warn("watchForCommit failed for index {}", Long.valueOf(j), exc);
        IOException iOException = new IOException("Unexpected Storage Container Exception: " + exc.toString(), exc);
        releaseBuffersOnException();
        return iOException;
    }

    public void cleanup() {
        if (this.commitIndexMap != null) {
            this.commitIndexMap.clear();
        }
        this.commitIndexMap = null;
    }
}
