/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.client.io;

import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECBlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECBlockOutputStreamEntryPool;
import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.om.protocol.S3Auth;
import org.apache.hadoop.ozone.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.shaded.org.apache.commons.lang3.NotImplementedException;
import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ECKeyOutputStream
extends KeyOutputStream
implements KeyMetadataAware {
    private OzoneClientConfig config;
    private ECChunkBuffers ecChunkBufferCache;
    private final BlockingQueue<ECChunkBuffers> ecStripeQueue;
    private int chunkIndex;
    private int ecChunkSize;
    private final int numDataBlks;
    private final int numParityBlks;
    private final ByteBufferPool bufferPool;
    private final RawErasureEncoder encoder;
    private final Future<Boolean> flushFuture;
    private final AtomicLong flushCheckpoint;
    private boolean atomicKeyCreation;
    public static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class);
    private volatile boolean closed;
    private volatile boolean closing;
    private long offset;
    private long writeOffset;

    @VisibleForTesting
    public void insertFlushCheckpoint(long version) throws IOException {
        this.addStripeToQueue(new CheckpointDummyStripe(version));
    }

    @VisibleForTesting
    public long getFlushCheckpoint() {
        return this.flushCheckpoint.get();
    }

    private ECKeyOutputStream(Builder builder) {
        super(builder.getReplicationConfig(), new ECBlockOutputStreamEntryPool(builder));
        this.config = builder.getClientConfig();
        this.bufferPool = builder.getByteBufferPool();
        this.ecChunkSize = builder.getReplicationConfig().getEcChunkSize();
        this.numDataBlks = builder.getReplicationConfig().getData();
        this.numParityBlks = builder.getReplicationConfig().getParity();
        this.ecChunkBufferCache = new ECChunkBuffers(this.ecChunkSize, this.numDataBlks, this.numParityBlks, this.bufferPool);
        this.chunkIndex = 0;
        this.ecStripeQueue = new ArrayBlockingQueue<ECChunkBuffers>(this.config.getEcStripeQueueSize());
        this.writeOffset = 0L;
        this.encoder = CodecUtil.createRawEncoderWithFallback(builder.getReplicationConfig());
        S3Auth s3Auth = builder.getS3CredentialsProvider().get();
        ThreadLocal<S3Auth> s3CredentialsProvider = builder.getS3CredentialsProvider();
        this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> {
            s3CredentialsProvider.set(s3Auth);
            return this.flushStripeFromQueue();
        });
        this.flushCheckpoint = new AtomicLong(0L);
        this.atomicKeyCreation = builder.getAtomicKeyCreation();
    }

    @Override
    protected ECBlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
        return (ECBlockOutputStreamEntryPool)super.getBlockOutputStreamEntryPool();
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.checkNotClosed();
        if (b == null) {
            throw new NullPointerException();
        }
        if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) {
            throw new IndexOutOfBoundsException();
        }
        try {
            for (int writtenLen = 0; writtenLen < len; writtenLen += this.handleWrite(b, off + writtenLen, len - writtenLen)) {
            }
        }
        catch (Exception e) {
            this.markStreamClosed();
            throw e;
        }
        this.writeOffset += (long)len;
    }

    private void rollbackAndReset(ECChunkBuffers stripe) throws IOException {
        ByteBuffer[] dataBuffers = stripe.getDataBuffers();
        this.offset -= (long)Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum();
        ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = this.getBlockOutputStreamEntryPool();
        ECBlockOutputStreamEntry failedStreamEntry = blockOutputStreamEntryPool.getCurrentStreamEntry();
        failedStreamEntry.resetToFirstEntry();
        failedStreamEntry.resetToAckedPosition();
        blockOutputStreamEntryPool.discardPreallocatedBlocks(-1L, failedStreamEntry.getPipeline().getId());
        failedStreamEntry.close();
    }

    private void logStreamError(List<ECBlockOutputStream> failedStreams, String operation) {
        if (!LOG.isWarnEnabled()) {
            return;
        }
        Set failedStreamIndexSet = failedStreams.stream().map(BlockOutputStream::getReplicationIndex).collect(Collectors.toSet());
        String failedStreamsString = IntStream.range(1, this.numDataBlks + this.numParityBlks + 1).mapToObj(index -> failedStreamIndexSet.contains(index) ? "F" : "S").collect(Collectors.joining(" "));
        LOG.warn("{} failed: {}", (Object)operation, (Object)failedStreamsString);
        for (ECBlockOutputStream stream : failedStreams) {
            LOG.warn("Failure for replica index: {}, DatanodeDetails: {}", new Object[]{stream.getReplicationIndex(), stream.getDatanodeDetails(), stream.getIoException()});
        }
    }

    private StripeWriteStatus commitStripeWrite(ECChunkBuffers stripe) throws IOException {
        ECBlockOutputStreamEntry streamEntry = this.getBlockOutputStreamEntryPool().getCurrentStreamEntry();
        List<ECBlockOutputStream> failedStreams = streamEntry.streamsWithWriteFailure();
        if (!failedStreams.isEmpty()) {
            this.logStreamError(failedStreams, "EC stripe write");
            this.excludePipelineAndFailedDN(streamEntry.getPipeline(), failedStreams);
            return StripeWriteStatus.FAILED;
        }
        boolean isLastStripe = streamEntry.getRemaining() <= 0L || stripe.getLastDataCell().limit() < this.ecChunkSize;
        ByteString checksum = streamEntry.calculateChecksum();
        streamEntry.executePutBlock(isLastStripe, streamEntry.getCurrentPosition(), checksum);
        failedStreams = streamEntry.streamsWithPutBlockFailure();
        if (!failedStreams.isEmpty()) {
            this.logStreamError(failedStreams, "Put block");
            this.excludePipelineAndFailedDN(streamEntry.getPipeline(), failedStreams);
            return StripeWriteStatus.FAILED;
        }
        streamEntry.updateBlockGroupToAckedPosition(streamEntry.getCurrentPosition());
        stripe.clear();
        if (streamEntry.getRemaining() <= 0L) {
            streamEntry.close();
        } else {
            streamEntry.resetToFirstEntry();
        }
        return StripeWriteStatus.SUCCESS;
    }

    private void excludePipelineAndFailedDN(Pipeline pipeline, List<ECBlockOutputStream> failedStreams) {
        ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = this.getBlockOutputStreamEntryPool();
        blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId());
        failedStreams.stream().filter(s2 -> !this.checkIfContainerToExclude(HddsClientUtils.checkForException(s2.getIoException()))).forEach(s2 -> blockOutputStreamEntryPool.getExcludeList().addDatanode(s2.getDatanodeDetails()));
    }

    @Override
    protected boolean checkIfContainerToExclude(Throwable t2) {
        return super.checkIfContainerToExclude(t2) && t2 instanceof ContainerNotOpenException;
    }

    private void generateParityCells() throws IOException {
        int i;
        ByteBuffer[] dataBuffers = this.ecChunkBufferCache.getDataBuffers();
        ByteBuffer[] parityBuffers = this.ecChunkBufferCache.getParityBuffers();
        int parityCellSize = dataBuffers[0].position();
        int firstNonFullIndex = dataBuffers.length;
        int firstNonFullLength = 0;
        for (i = 0; i < dataBuffers.length; ++i) {
            if (dataBuffers[i].position() == this.ecChunkSize) continue;
            firstNonFullIndex = i;
            firstNonFullLength = dataBuffers[i].position();
            break;
        }
        for (i = firstNonFullIndex + 1; i < dataBuffers.length; ++i) {
            Preconditions.checkState(dataBuffers[i].position() == 0, "Illegal stripe state: cell {} is not full while cell {} has data", firstNonFullIndex, i);
        }
        for (i = firstNonFullIndex; i < dataBuffers.length; ++i) {
            ECKeyOutputStream.padBufferToLimit(dataBuffers[i], parityCellSize);
        }
        for (ByteBuffer b : parityBuffers) {
            b.limit(parityCellSize);
        }
        for (ByteBuffer b : dataBuffers) {
            b.flip();
        }
        this.encoder.encode(dataBuffers, parityBuffers);
        if (firstNonFullIndex < dataBuffers.length) {
            dataBuffers[firstNonFullIndex].limit(firstNonFullLength);
        }
        for (int i2 = firstNonFullIndex + 1; i2 < dataBuffers.length; ++i2) {
            dataBuffers[i2].limit(0);
        }
    }

    private void writeDataCells(ECChunkBuffers stripe) throws IOException {
        ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = this.getBlockOutputStreamEntryPool();
        blockOutputStreamEntryPool.allocateBlockIfNeeded(false);
        ByteBuffer[] dataCells = stripe.getDataBuffers();
        for (int i = 0; i < this.numDataBlks; ++i) {
            if (dataCells[i].limit() > 0) {
                this.handleOutputStreamWrite(dataCells[i], false);
            }
            blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
        }
    }

    private void writeParityCells(ECChunkBuffers stripe) {
        ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = this.getBlockOutputStreamEntryPool();
        blockOutputStreamEntryPool.getCurrentStreamEntry().forceToFirstParityBlock();
        ByteBuffer[] parityCells = stripe.getParityBuffers();
        for (int i = 0; i < this.numParityBlks; ++i) {
            this.handleOutputStreamWrite(parityCells[i], true);
            blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
        }
    }

    private int handleWrite(byte[] b, int off, int len) throws IOException {
        int bufferRem = this.ecChunkBufferCache.dataBuffers[this.chunkIndex].remaining();
        int writeLen = Math.min(len, Math.min(bufferRem, this.ecChunkSize));
        int pos = this.ecChunkBufferCache.addToDataBuffer(this.chunkIndex, b, off, writeLen);
        if (pos == this.ecChunkSize) {
            ++this.chunkIndex;
            if (this.chunkIndex == this.numDataBlks) {
                this.generateParityCells();
                this.addStripeToQueue(this.ecChunkBufferCache);
                this.ecChunkBufferCache = new ECChunkBuffers(this.ecChunkSize, this.numDataBlks, this.numParityBlks, this.bufferPool);
                this.chunkIndex = 0;
            }
        }
        return writeLen;
    }

    private void handleOutputStreamWrite(ByteBuffer buffer, boolean isParity) {
        try {
            assert (buffer.limit() <= this.ecChunkSize) : "The buffer size: " + buffer.limit() + " should not exceed EC chunk size: " + this.ecChunkSize;
            this.writeToOutputStream(this.getBlockOutputStreamEntryPool().getCurrentStreamEntry(), buffer.array(), buffer.limit(), 0, isParity);
        }
        catch (Exception e) {
            this.markStreamAsFailed(e);
        }
    }

    private void writeToOutputStream(ECBlockOutputStreamEntry current, byte[] b, int writeLen, int off, boolean isParity) throws IOException {
        if (this.closing) {
            throw new IOException("Stream is closing, avoid re-opening streams");
        }
        try {
            if (!isParity) {
                this.offset += (long)writeLen;
            }
            current.write(b, off, writeLen);
        }
        catch (IOException ioe) {
            LOG.debug("Exception while writing the cell buffers. The writeLen: " + writeLen + ". The block internal index is: " + current.getCurrentStreamIdx(), (Throwable)ioe);
            this.handleException(current, ioe);
        }
    }

    private void handleException(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException {
        Throwable t2 = HddsClientUtils.checkForException(exception);
        Preconditions.checkNotNull(t2);
        boolean containerExclusionException = this.checkIfContainerToExclude(t2);
        if (containerExclusionException) {
            this.getBlockOutputStreamEntryPool().getExcludeList().addPipeline(streamEntry.getPipeline().getId());
        }
        this.markStreamAsFailed(exception);
    }

    private void markStreamClosed() {
        this.closing = true;
    }

    private void markStreamAsFailed(Exception e) {
        this.getBlockOutputStreamEntryPool().getCurrentStreamEntry().markFailed(e);
    }

    @Override
    public void flush() {
        LOG.debug("ECKeyOutputStream does not support flush.");
    }

    @Override
    public void hflush() {
        throw new NotImplementedException("ECKeyOutputStream does not support hflush.");
    }

    @Override
    public void hsync() {
        throw new NotImplementedException("ECKeyOutputStream does not support hsync.");
    }

    private void closeCurrentStreamEntry() throws IOException {
        ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = this.getBlockOutputStreamEntryPool();
        if (!blockOutputStreamEntryPool.isEmpty()) {
            try {
                ECBlockOutputStreamEntry entry;
                while ((entry = blockOutputStreamEntryPool.getCurrentStreamEntry()) != null) {
                    try {
                        ((BlockOutputStreamEntry)entry).close();
                        break;
                    }
                    catch (IOException ioe) {
                        this.handleException(entry, ioe);
                    }
                }
                return;
            }
            catch (Exception e) {
                this.markStreamClosed();
                throw e;
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = this.getBlockOutputStreamEntryPool();
        try {
            if (!this.closing) {
                if (this.ecChunkBufferCache.getFirstDataCell().position() > 0) {
                    this.generateParityCells();
                    this.addStripeToQueue(this.ecChunkBufferCache);
                }
                this.addStripeToQueue(new EOFDummyStripe());
                this.flushFuture.get();
                Preconditions.checkArgument(this.writeOffset == this.offset, "Expected writeOffset= " + this.writeOffset + " Expected offset=" + this.offset);
                if (this.atomicKeyCreation) {
                    long expectedSize = blockOutputStreamEntryPool.getDataSize();
                    Preconditions.checkState(expectedSize == this.offset, String.format("Expected: %d and actual %d write sizes do not match", expectedSize, this.offset));
                }
                blockOutputStreamEntryPool.commitKey(this.offset);
            }
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof IOException) {
                throw (IOException)cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException)cause;
            }
            throw new IOException(cause);
        }
        catch (InterruptedException e) {
            throw new IOException("Flushing thread was interrupted", e);
        }
        finally {
            this.closeCurrentStreamEntry();
            blockOutputStreamEntryPool.cleanup();
        }
    }

    private void addStripeToQueue(ECChunkBuffers stripe) throws IOException {
        try {
            do {
                if (!this.flushFuture.isDone()) continue;
                this.flushFuture.get();
                throw new IOException("Flush thread has ended before stream close");
            } while (!this.ecStripeQueue.offer(stripe, 1L, TimeUnit.SECONDS));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while adding stripe to queue", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            if (e.getCause() instanceof RuntimeException) {
                throw (RuntimeException)e.getCause();
            }
            throw new IOException(e.getCause());
        }
    }

    private boolean flushStripeFromQueue() throws IOException {
        try {
            ECChunkBuffers stripe = this.ecStripeQueue.take();
            while (!this.closing && !(stripe instanceof EOFDummyStripe)) {
                if (stripe instanceof CheckpointDummyStripe) {
                    this.flushCheckpoint.set(((CheckpointDummyStripe)stripe).version);
                } else {
                    this.flushStripeToDatanodes(stripe);
                    stripe.release();
                }
                stripe = this.ecStripeQueue.take();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while polling stripe from queue", e);
        }
        return true;
    }

    private void flushStripeToDatanodes(ECChunkBuffers stripe) throws IOException {
        int maxRetry = this.config.getMaxECStripeWriteRetries();
        for (int i = 0; i <= maxRetry; ++i) {
            this.writeDataCells(stripe);
            this.writeParityCells(stripe);
            if (this.commitStripeWrite(stripe) == StripeWriteStatus.SUCCESS) {
                return;
            }
            this.rollbackAndReset(stripe);
        }
        throw new IOException("Completed max allowed retries " + maxRetry + " on stripe failures.");
    }

    public static void padBufferToLimit(ByteBuffer buf, int limit) {
        int pos = buf.position();
        if (pos >= limit) {
            return;
        }
        Arrays.fill(buf.array(), pos, limit, (byte)0);
        buf.position(limit);
    }

    private void checkNotClosed() throws IOException {
        if (this.closing || this.closed) {
            throw new IOException("Stream is closed! Key: " + this.getBlockOutputStreamEntryPool().getKeyName());
        }
    }

    private static class ECChunkBuffers {
        private final ByteBuffer[] dataBuffers;
        private final ByteBuffer[] parityBuffers;
        private int cellSize;
        private ByteBufferPool byteBufferPool;

        ECChunkBuffers() {
            this.dataBuffers = null;
            this.parityBuffers = null;
        }

        ECChunkBuffers(int cellSize, int numData, int numParity, ByteBufferPool byteBufferPool) {
            this.cellSize = cellSize;
            this.dataBuffers = new ByteBuffer[numData];
            this.parityBuffers = new ByteBuffer[numParity];
            this.byteBufferPool = byteBufferPool;
            this.allocateBuffers(this.dataBuffers, this.cellSize);
            this.allocateBuffers(this.parityBuffers, this.cellSize);
        }

        private ByteBuffer[] getDataBuffers() {
            return this.dataBuffers;
        }

        private ByteBuffer[] getParityBuffers() {
            return this.parityBuffers;
        }

        private ByteBuffer getFirstDataCell() {
            return this.dataBuffers[0];
        }

        private ByteBuffer getLastDataCell() {
            return this.dataBuffers[this.dataBuffers.length - 1];
        }

        private int addToDataBuffer(int i, byte[] b, int off, int len) {
            ByteBuffer buf = this.dataBuffers[i];
            int pos = buf.position() + len;
            Preconditions.checkState(pos <= this.cellSize, "Position(" + pos + ") is greater than the cellSize(" + this.cellSize + ").");
            buf.put(b, off, len);
            return pos;
        }

        private void clear() {
            this.clearBuffers(this.dataBuffers);
            this.clearBuffers(this.parityBuffers);
        }

        private void release() {
            this.releaseBuffers(this.dataBuffers);
            this.releaseBuffers(this.parityBuffers);
        }

        private void allocateBuffers(ByteBuffer[] buffers, int bufferSize) {
            for (int i = 0; i < buffers.length; ++i) {
                buffers[i] = this.byteBufferPool.getBuffer(false, this.cellSize);
                buffers[i].limit(bufferSize);
            }
        }

        private void clearBuffers(ByteBuffer[] buffers) {
            for (int i = 0; i < buffers.length; ++i) {
                buffers[i].clear();
                buffers[i].limit(this.cellSize);
            }
        }

        private void releaseBuffers(ByteBuffer[] buffers) {
            for (int i = 0; i < buffers.length; ++i) {
                if (buffers[i] == null) continue;
                this.byteBufferPool.putBuffer(buffers[i]);
                buffers[i] = null;
            }
        }
    }

    private static class CheckpointDummyStripe
    extends ECChunkBuffers {
        private final long version;

        CheckpointDummyStripe(long version) {
            this.version = version;
        }
    }

    private static class EOFDummyStripe
    extends ECChunkBuffers {
        EOFDummyStripe() {
        }
    }

    public static class Builder
    extends KeyOutputStream.Builder {
        private ECReplicationConfig replicationConfig;
        private ByteBufferPool byteBufferPool;
        private ThreadLocal<S3Auth> s3CredentialsProvider;

        @Override
        public ECReplicationConfig getReplicationConfig() {
            return this.replicationConfig;
        }

        public Builder setReplicationConfig(ECReplicationConfig replConfig) {
            this.replicationConfig = replConfig;
            return this;
        }

        public ByteBufferPool getByteBufferPool() {
            return this.byteBufferPool;
        }

        public Builder setByteBufferPool(ByteBufferPool bufferPool) {
            this.byteBufferPool = bufferPool;
            return this;
        }

        public Builder setS3CredentialsProvider(ThreadLocal<S3Auth> s3CredentialsThreadLocal) {
            this.s3CredentialsProvider = s3CredentialsThreadLocal;
            return this;
        }

        public ThreadLocal<S3Auth> getS3CredentialsProvider() {
            return this.s3CredentialsProvider;
        }

        @Override
        public ECKeyOutputStream build() {
            return new ECKeyOutputStream(this);
        }
    }

    private static enum StripeWriteStatus {
        SUCCESS,
        FAILED;

    }
}

