/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.client.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.ozone.client.io.BadDataLocationException;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ECBlockInputStreamProxy
extends BlockExtendedInputStream {
    private static final Logger LOG = LoggerFactory.getLogger(ECBlockInputStreamProxy.class);
    private final ECReplicationConfig repConfig;
    private final boolean verifyChecksum;
    private final XceiverClientFactory xceiverClientFactory;
    private final Function<BlockID, Pipeline> refreshFunction;
    private final BlockLocationInfo blockInfo;
    private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
    private BlockExtendedInputStream blockReader;
    private boolean reconstructionReader = false;
    private List<DatanodeDetails> failedLocations = new ArrayList<DatanodeDetails>();
    private boolean closed = false;

    public static int expectedDataLocations(ECReplicationConfig repConfig, long blockLength) {
        return (int)Math.min(Math.ceil((double)blockLength / (double)repConfig.getEcChunkSize()), (double)repConfig.getData());
    }

    public static int availableDataLocations(Pipeline pipeline, int expectedLocs) {
        HashSet<Integer> locations = new HashSet<Integer>();
        for (DatanodeDetails dn : pipeline.getNodes()) {
            int index = pipeline.getReplicaIndex(dn);
            if (index <= 0 || index > expectedLocs) continue;
            locations.add(index);
        }
        return locations.size();
    }

    public ECBlockInputStreamProxy(ECReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, XceiverClientFactory xceiverClientFactory, Function<BlockID, Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
        this.repConfig = repConfig;
        this.verifyChecksum = verifyChecksum;
        this.blockInfo = blockInfo;
        this.ecBlockInputStreamFactory = streamFactory;
        this.xceiverClientFactory = xceiverClientFactory;
        this.refreshFunction = refreshFunction;
        this.setReaderType();
        this.createBlockReader();
    }

    private synchronized void setReaderType() {
        int expected = ECBlockInputStreamProxy.expectedDataLocations(this.repConfig, this.getLength());
        int available = ECBlockInputStreamProxy.availableDataLocations(this.blockInfo.getPipeline(), expected);
        this.reconstructionReader = available < expected;
    }

    private void createBlockReader() {
        this.blockReader = this.ecBlockInputStreamFactory.create(this.reconstructionReader, this.failedLocations, (ReplicationConfig)this.repConfig, this.blockInfo, this.verifyChecksum, this.xceiverClientFactory, this.refreshFunction);
    }

    @Override
    public synchronized BlockID getBlockID() {
        return this.blockInfo.getBlockID();
    }

    @Override
    public synchronized long getRemaining() {
        return this.blockReader.getRemaining();
    }

    @Override
    public synchronized long getLength() {
        return this.blockInfo.getLength();
    }

    @Override
    public synchronized int read(byte[] b, int off, int len) throws IOException {
        return this.read(ByteBuffer.wrap(b, off, len));
    }

    @Override
    public synchronized int read(ByteBuffer buf) throws IOException {
        this.ensureNotClosed();
        if (this.blockReader.getRemaining() == 0L) {
            return -1;
        }
        int totalRead = 0;
        long lastPosition = 0L;
        try {
            while (buf.hasRemaining() && this.getRemaining() > 0L) {
                buf.mark();
                lastPosition = this.blockReader.getPos();
                totalRead += this.blockReader.read(buf);
            }
        }
        catch (IOException e) {
            if (this.reconstructionReader) {
                throw e;
            }
            if (e instanceof BadDataLocationException) {
                String message = "Failing over to reconstruction read due to an error in ECBlockReader.";
                if (LOG.isDebugEnabled()) {
                    LOG.debug(message, (Throwable)e);
                } else {
                    LOG.warn("{} Exception Class: {} , Exception Message: {}", new Object[]{message, e.getClass().getName(), e.getMessage()});
                }
                this.failoverToReconstructionRead(((BadDataLocationException)e).getFailedLocation(), lastPosition);
                buf.reset();
                totalRead += this.read(buf);
            }
            throw e;
        }
        return totalRead;
    }

    private synchronized void failoverToReconstructionRead(DatanodeDetails badLocation, long lastPosition) throws IOException {
        if (badLocation != null) {
            this.failedLocations.add(badLocation);
        }
        this.blockReader.close();
        this.reconstructionReader = true;
        this.createBlockReader();
        if (lastPosition != 0L) {
            this.blockReader.seek(lastPosition);
        }
    }

    @Override
    protected synchronized int readWithStrategy(ByteReaderStrategy strategy) throws IOException {
        throw new IOException("Not Implemented");
    }

    public synchronized void unbuffer() {
        this.blockReader.unbuffer();
    }

    public synchronized long getPos() throws IOException {
        return this.blockReader != null ? this.blockReader.getPos() : 0L;
    }

    @Override
    public synchronized void seek(long pos) throws IOException {
        this.ensureNotClosed();
        try {
            this.blockReader.seek(pos);
        }
        catch (IOException e) {
            if (this.reconstructionReader) {
                throw e;
            }
            this.failoverToReconstructionRead(null, pos);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.blockReader != null) {
            this.blockReader.close();
        }
        this.closed = true;
    }

    private void ensureNotClosed() throws IOException {
        if (this.closed) {
            throw new IOException("The stream is closed");
        }
    }
}

