/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.client.io;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.client.io.BadDataLocationException;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ECBlockInputStream
extends BlockExtendedInputStream {
    private static final Logger LOG = LoggerFactory.getLogger(ECBlockInputStream.class);
    private final ECReplicationConfig repConfig;
    private final int ecChunkSize;
    private final long stripeSize;
    private final BlockInputStreamFactory streamFactory;
    private final boolean verifyChecksum;
    private final XceiverClientFactory xceiverClientFactory;
    private final Function<BlockID, Pipeline> refreshFunction;
    private final BlockLocationInfo blockInfo;
    private final DatanodeDetails[] dataLocations;
    private final BlockExtendedInputStream[] blockStreams;
    private final int maxLocations;
    private long position = 0L;
    private boolean closed = false;
    private boolean seeked = false;

    protected ECReplicationConfig getRepConfig() {
        return this.repConfig;
    }

    protected DatanodeDetails[] getDataLocations() {
        return this.dataLocations;
    }

    protected long getStripeSize() {
        return this.stripeSize;
    }

    protected int availableDataLocations(int expectedLocations) {
        int count = 0;
        for (int i = 0; i < this.repConfig.getData() && i < expectedLocations; ++i) {
            if (this.dataLocations[i] == null) continue;
            ++count;
        }
        return count;
    }

    protected int availableParityLocations() {
        int count = 0;
        for (int i = this.repConfig.getData(); i < this.repConfig.getData() + this.repConfig.getParity(); ++i) {
            if (this.dataLocations[i] == null) continue;
            ++count;
        }
        return count;
    }

    public ECBlockInputStream(ECReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, XceiverClientFactory xceiverClientFactory, Function<BlockID, Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
        this.repConfig = repConfig;
        this.ecChunkSize = repConfig.getEcChunkSize();
        this.verifyChecksum = verifyChecksum;
        this.blockInfo = blockInfo;
        this.streamFactory = streamFactory;
        this.xceiverClientFactory = xceiverClientFactory;
        this.refreshFunction = refreshFunction;
        this.maxLocations = repConfig.getData() + repConfig.getParity();
        this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
        this.blockStreams = new BlockExtendedInputStream[repConfig.getRequiredNodes()];
        this.stripeSize = (long)this.ecChunkSize * (long)repConfig.getData();
        this.setBlockLocations(this.blockInfo.getPipeline());
    }

    public synchronized boolean hasSufficientLocations() {
        int expectedDataBlocks = this.calculateExpectedDataBlocks(this.repConfig);
        return expectedDataBlocks == this.availableDataLocations(expectedDataBlocks);
    }

    protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
        return ECBlockInputStreamProxy.expectedDataLocations(rConfig, this.getLength());
    }

    protected int currentStreamIndex() {
        return (int)(this.position / (long)this.ecChunkSize % (long)this.repConfig.getData());
    }

    protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
        BlockExtendedInputStream stream = this.blockStreams[locationIndex];
        if (stream == null) {
            Pipeline pipeline = Pipeline.newBuilder().setReplicationConfig((ReplicationConfig)StandaloneReplicationConfig.getInstance((HddsProtos.ReplicationFactor)HddsProtos.ReplicationFactor.ONE)).setNodes(Arrays.asList(this.dataLocations[locationIndex])).setId(PipelineID.randomId()).setReplicaIndexes((Map)ImmutableMap.of((Object)this.dataLocations[locationIndex], (Object)(locationIndex + 1))).setState(Pipeline.PipelineState.CLOSED).build();
            BlockLocationInfo blkInfo = new BlockLocationInfo.Builder().setBlockID(this.blockInfo.getBlockID()).setLength(this.internalBlockLength(locationIndex + 1)).setPipeline(this.blockInfo.getPipeline()).setToken(this.blockInfo.getToken()).setPartNumber(this.blockInfo.getPartNumber()).build();
            this.blockStreams[locationIndex] = stream = this.streamFactory.create((ReplicationConfig)StandaloneReplicationConfig.getInstance((HddsProtos.ReplicationFactor)HddsProtos.ReplicationFactor.ONE), blkInfo, pipeline, (Token<OzoneBlockTokenIdentifier>)this.blockInfo.getToken(), this.verifyChecksum, this.xceiverClientFactory, this.ecPipelineRefreshFunction(locationIndex + 1, this.refreshFunction));
        }
        return stream;
    }

    protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
        return blockID -> {
            Pipeline ecPipeline = (Pipeline)refreshFunc.apply((BlockID)blockID);
            if (ecPipeline == null) {
                return null;
            }
            DatanodeDetails curIndexNode = ecPipeline.getNodes().stream().filter(dn -> ecPipeline.getReplicaIndex(dn) == replicaIndex).findAny().orElse(null);
            if (curIndexNode == null) {
                return null;
            }
            return Pipeline.newBuilder().setReplicationConfig((ReplicationConfig)StandaloneReplicationConfig.getInstance((HddsProtos.ReplicationFactor)HddsProtos.ReplicationFactor.ONE)).setNodes(Collections.singletonList(curIndexNode)).setId(PipelineID.randomId()).setState(Pipeline.PipelineState.CLOSED).build();
        };
    }

    protected long internalBlockLength(int index) {
        long lastStripe = this.blockInfo.getLength() % this.stripeSize;
        long blockSize = (this.blockInfo.getLength() - lastStripe) / (long)this.repConfig.getData();
        long lastCell = lastStripe / (long)this.ecChunkSize + 1L;
        long lastCellLength = lastStripe % (long)this.ecChunkSize;
        if (index > this.repConfig.getData()) {
            index = 1;
        }
        if ((long)index < lastCell) {
            return blockSize + (long)this.ecChunkSize;
        }
        if ((long)index == lastCell) {
            return blockSize + lastCellLength;
        }
        return blockSize;
    }

    private void setBlockLocations(Pipeline pipeline) {
        for (DatanodeDetails node : pipeline.getNodes()) {
            int index = pipeline.getReplicaIndex(node);
            this.addBlockLocation(index, node);
        }
    }

    private void addBlockLocation(int index, DatanodeDetails location) {
        if (index > this.maxLocations) {
            throw new IndexOutOfBoundsException("The index " + index + " is greater than the EC Replication Config (" + this.repConfig + ")");
        }
        this.dataLocations[index - 1] = location;
    }

    protected long blockLength() {
        return this.blockInfo.getLength();
    }

    protected long remaining() {
        return this.blockLength() - this.position;
    }

    @Override
    protected synchronized int readWithStrategy(ByteReaderStrategy strategy) throws IOException {
        Preconditions.checkArgument((strategy != null ? 1 : 0) != 0);
        this.checkOpen();
        if (this.remaining() == 0L) {
            return -1;
        }
        int totalRead = 0;
        while (strategy.getTargetLength() > 0 && this.remaining() > 0L) {
            try {
                int currentIndex = this.currentStreamIndex();
                BlockExtendedInputStream stream = this.getOrOpenStream(currentIndex);
                int read = this.readFromStream(stream, strategy);
                totalRead += read;
                this.position += (long)read;
            }
            catch (IOException ioe) {
                throw new BadDataLocationException(this.dataLocations[this.currentStreamIndex()], (Throwable)ioe);
            }
        }
        return totalRead;
    }

    @Override
    public synchronized long getRemaining() {
        return this.blockInfo.getLength() - this.position;
    }

    @Override
    public synchronized long getLength() {
        return this.blockInfo.getLength();
    }

    @Override
    public BlockID getBlockID() {
        return this.blockInfo.getBlockID();
    }

    protected void seekStreamIfNecessary(BlockExtendedInputStream stream, long partialChunkSize) throws IOException {
        long basePosition;
        long streamPosition;
        if (this.seeked && (streamPosition = (basePosition = this.position / this.stripeSize * (long)this.ecChunkSize) + partialChunkSize) != stream.getPos()) {
            stream.seek(streamPosition);
        }
    }

    private int readFromStream(BlockExtendedInputStream stream, ByteReaderStrategy strategy) throws IOException {
        long partialPosition = this.position % (long)this.ecChunkSize;
        this.seekStreamIfNecessary(stream, partialPosition);
        long ecLimit = (long)this.ecChunkSize - partialPosition;
        long bufLimit = strategy.getTargetLength();
        int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), this.remaining());
        int actualRead = strategy.readFromBlock(stream, expectedRead);
        if (actualRead == -1) {
            throw new IOException("Expected to read " + expectedRead + " but got EOF from blockGroup " + stream.getBlockID() + " index " + this.currentStreamIndex() + 1);
        }
        return actualRead;
    }

    private void checkOpen() throws IOException {
        if (this.closed) {
            throw new IOException(": Stream is closed! Block: " + this.blockInfo.getBlockID());
        }
    }

    @Override
    public synchronized void close() {
        this.closeStreams();
        this.closed = true;
    }

    protected synchronized void closeStreams() {
        for (int i = 0; i < this.blockStreams.length; ++i) {
            if (this.blockStreams[i] == null) continue;
            try {
                this.blockStreams[i].close();
                this.blockStreams[i] = null;
                continue;
            }
            catch (IOException e) {
                LOG.error("Failed to close stream {}", (Object)this.blockStreams[i], (Object)e);
            }
        }
        this.seeked = true;
    }

    public synchronized void unbuffer() {
        for (BlockExtendedInputStream stream : this.blockStreams) {
            if (stream == null) continue;
            stream.unbuffer();
        }
    }

    @Override
    public synchronized void seek(long pos) throws IOException {
        this.checkOpen();
        if (pos < 0L || pos > this.getLength()) {
            if (pos == 0L) {
                return;
            }
            throw new EOFException("EOF encountered at pos: " + pos + " for block: " + this.blockInfo.getBlockID());
        }
        this.position = pos;
        this.seeked = true;
    }

    public synchronized long getPos() {
        return this.position;
    }

    protected synchronized void setPos(long pos) {
        this.position = pos;
    }

    @Override
    public synchronized boolean seekToNewSource(long l) throws IOException {
        return false;
    }
}

