/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.obs;

import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.GetObjectRequest;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.obs.OBSCommonUtils;
import org.apache.hadoop.fs.obs.OBSFileSystem;
import org.apache.hadoop.fs.obs.OBSIOException;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
class OBSInputStream
extends FSInputStream
implements CanSetReadahead,
ByteBufferReadable {
    public static final Logger LOG = LoggerFactory.getLogger(OBSInputStream.class);
    private static final int READ_RETRY_TIME = 3;
    private static final int SEEK_RETRY_TIME = 9;
    private static final long DELAY_TIME = 10L;
    private final FileSystem.Statistics statistics;
    private final ObsClient client;
    private final String bucket;
    private final String key;
    private final long contentLength;
    private final String uri;
    private OBSFileSystem fs;
    private long streamCurrentPos;
    private volatile boolean closed;
    private InputStream wrappedStream = null;
    private long readAheadRange = 0x100000L;
    private long nextReadPos;
    private long contentRangeFinish;
    private long contentRangeStart;

    OBSInputStream(String bucketName, String bucketKey, long fileStatusLength, ObsClient obsClient, FileSystem.Statistics stats, long readaheadRange, OBSFileSystem obsFileSystem) {
        Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((CharSequence)bucketName), (Object)"No Bucket");
        Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((CharSequence)bucketKey), (Object)"No Key");
        Preconditions.checkArgument((fileStatusLength >= 0L ? 1 : 0) != 0, (Object)"Negative content length");
        this.bucket = bucketName;
        this.key = bucketKey;
        this.contentLength = fileStatusLength;
        this.client = obsClient;
        this.statistics = stats;
        this.uri = "obs://" + this.bucket + "/" + this.key;
        this.fs = obsFileSystem;
        this.setReadahead(readaheadRange);
    }

    static long calculateRequestLimit(long targetPos, long length, long contentLength, long readahead) {
        return Math.min(contentLength, length < 0L ? contentLength : targetPos + Math.max(readahead, length));
    }

    private synchronized void reopen(String reason, long targetPos, long length) throws IOException {
        long startTime = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        if (this.wrappedStream != null) {
            this.closeStream("reopen(" + reason + ")", this.contentRangeFinish);
        }
        this.contentRangeFinish = OBSInputStream.calculateRequestLimit(targetPos, length, this.contentLength, this.readAheadRange);
        try {
            GetObjectRequest request = new GetObjectRequest(this.bucket, this.key);
            request.setRangeStart(Long.valueOf(targetPos));
            request.setRangeEnd(Long.valueOf(this.contentRangeFinish));
            if (this.fs.getSse().isSseCEnable()) {
                request.setSseCHeader(this.fs.getSse().getSseCHeader());
            }
            this.wrappedStream = this.client.getObject(request).getObjectContent();
            this.contentRangeStart = targetPos;
            if (this.wrappedStream == null) {
                throw new IOException("Null IO stream from reopen of (" + reason + ") " + this.uri);
            }
        }
        catch (ObsException e) {
            throw OBSCommonUtils.translateException("Reopen at position " + targetPos, this.uri, e);
        }
        this.streamCurrentPos = targetPos;
        long endTime = System.currentTimeMillis();
        LOG.debug("reopen({}) for {} range[{}-{}], length={}, streamPosition={}, nextReadPosition={}, thread={}, timeUsedInMilliSec={}", new Object[]{this.uri, reason, targetPos, this.contentRangeFinish, length, this.streamCurrentPos, this.nextReadPos, threadId, endTime - startTime});
    }

    public synchronized long getPos() {
        return this.nextReadPos < 0L ? 0L : this.nextReadPos;
    }

    public synchronized void seek(long targetPos) throws IOException {
        this.checkNotClosed();
        if (targetPos < 0L) {
            throw new EOFException("Cannot seek to a negative offset " + targetPos);
        }
        if (this.contentLength <= 0L) {
            return;
        }
        this.nextReadPos = targetPos;
    }

    private void seekQuietly(long positiveTargetPos) {
        try {
            this.seek(positiveTargetPos);
        }
        catch (IOException ioe) {
            LOG.debug("Ignoring IOE on seek of {} to {}", new Object[]{this.uri, positiveTargetPos, ioe});
        }
    }

    private void seekInStream(long targetPos) throws IOException {
        this.checkNotClosed();
        if (this.wrappedStream == null) {
            return;
        }
        long diff = targetPos - this.streamCurrentPos;
        if (diff > 0L) {
            boolean skipForward;
            int available = this.wrappedStream.available();
            long forwardSeekRange = Math.max(this.readAheadRange, (long)available);
            long remainingInCurrentRequest = this.remainingInCurrentRequest();
            long forwardSeekLimit = Math.min(remainingInCurrentRequest, forwardSeekRange);
            boolean bl = skipForward = remainingInCurrentRequest > 0L && diff <= forwardSeekLimit;
            if (skipForward) {
                LOG.debug("Forward seek on {}, of {} bytes", (Object)this.uri, (Object)diff);
                long skippedOnce = this.wrappedStream.skip(diff);
                while (diff > 0L && skippedOnce > 0L) {
                    this.streamCurrentPos += skippedOnce;
                    this.incrementBytesRead(skippedOnce);
                    skippedOnce = this.wrappedStream.skip(diff -= skippedOnce);
                }
                if (this.streamCurrentPos == targetPos) {
                    return;
                }
                LOG.info("Failed to seek on {} to {}. Current position {}", new Object[]{this.uri, targetPos, this.streamCurrentPos});
            }
        } else if (diff == 0L && this.remainingInCurrentRequest() > 0L) {
            return;
        }
        this.closeStream("seekInStream()", this.contentRangeFinish);
        this.streamCurrentPos = targetPos;
    }

    public boolean seekToNewSource(long targetPos) {
        return false;
    }

    private void lazySeek(long targetPos, long len) throws IOException {
        for (int i = 0; i < 9; ++i) {
            try {
                this.seekInStream(targetPos);
                if (this.wrappedStream != null) break;
                this.reopen("read from new offset", targetPos, len);
                break;
            }
            catch (IOException e) {
                Throwable cause;
                if (this.wrappedStream != null) {
                    this.closeStream("lazySeek() seekInStream has exception ", this.contentRangeFinish);
                }
                if ((cause = e.getCause()) instanceof ObsException) {
                    ObsException obsException = (ObsException)cause;
                    int status = obsException.getResponseCode();
                    switch (status) {
                        case 401: 
                        case 403: 
                        case 404: 
                        case 410: 
                        case 416: {
                            throw e;
                        }
                    }
                }
                LOG.warn("IOException occurred in lazySeek, retry: {}", (Object)i, (Object)e);
                if (i == 8) {
                    throw e;
                }
                try {
                    Thread.sleep(10L);
                    continue;
                }
                catch (InterruptedException ie) {
                    throw e;
                }
            }
        }
    }

    private void incrementBytesRead(long bytesRead) {
        if (this.statistics != null && bytesRead > 0L) {
            this.statistics.incrementBytesRead(bytesRead);
        }
    }

    private void sleepInLock() throws InterruptedException {
        long start;
        long now = start = System.currentTimeMillis();
        while (now - start < 10L) {
            ((Object)((Object)this)).wait(start + 10L - now);
            now = System.currentTimeMillis();
        }
    }

    public synchronized int read() throws IOException {
        long startTime = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        this.checkNotClosed();
        if (this.contentLength == 0L || this.nextReadPos >= this.contentLength) {
            return -1;
        }
        int byteRead = -1;
        try {
            this.lazySeek(this.nextReadPos, 1L);
        }
        catch (EOFException e) {
            this.onReadFailure(e, 1);
            return -1;
        }
        IOException exception = null;
        for (int retryTime = 1; retryTime <= 3; ++retryTime) {
            try {
                byteRead = this.wrappedStream.read();
                exception = null;
                break;
            }
            catch (EOFException e) {
                this.onReadFailure(e, 1);
                return -1;
            }
            catch (IOException e) {
                exception = e;
                this.onReadFailure(e, 1);
                LOG.warn("read of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{this.uri, retryTime, exception});
                if (retryTime >= 3) continue;
                try {
                    this.sleepInLock();
                    continue;
                }
                catch (InterruptedException ie) {
                    LOG.error("read of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{this.uri, retryTime, exception});
                    throw exception;
                }
            }
        }
        if (exception != null) {
            LOG.error("read of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{this.uri, 3, exception});
            throw exception;
        }
        if (byteRead >= 0) {
            ++this.streamCurrentPos;
            ++this.nextReadPos;
        }
        if (byteRead >= 0) {
            this.incrementBytesRead(1L);
        }
        long endTime = System.currentTimeMillis();
        LOG.debug("read-0arg uri:{}, contentLength:{}, position:{}, readValue:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, this.contentLength, byteRead >= 0 ? this.nextReadPos - 1L : this.nextReadPos, byteRead, threadId, endTime - startTime});
        return byteRead;
    }

    private void onReadFailure(IOException ioe, int length) throws IOException {
        LOG.debug("Got exception while trying to read from stream {} trying to recover: " + ioe, (Object)this.uri);
        int i = 1;
        while (true) {
            try {
                this.reopen("failure recovery", this.streamCurrentPos, length);
                return;
            }
            catch (OBSIOException e) {
                LOG.warn("OBSIOException occurred in reopen for failure recovery, the {} retry time", (Object)i, (Object)e);
                if (i == 3) {
                    throw e;
                }
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException ie) {
                    throw e;
                }
                ++i;
                continue;
            }
            break;
        }
    }

    public synchronized int read(ByteBuffer byteBuffer) throws IOException {
        long startTime = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        LOG.debug("read byteBuffer: {}", (Object)byteBuffer.toString());
        this.checkNotClosed();
        int len = byteBuffer.remaining();
        if (len == 0) {
            return 0;
        }
        byte[] buf = new byte[len];
        if (this.contentLength == 0L || this.nextReadPos >= this.contentLength) {
            return -1;
        }
        try {
            this.lazySeek(this.nextReadPos, len);
        }
        catch (EOFException e) {
            this.onReadFailure(e, len);
            return -1;
        }
        int bytesRead = 0;
        IOException exception = null;
        for (int retryTime = 1; retryTime <= 3; ++retryTime) {
            try {
                bytesRead = this.tryToReadFromInputStream(this.wrappedStream, buf, 0, len);
                if (bytesRead == -1) {
                    return -1;
                }
                exception = null;
                break;
            }
            catch (EOFException e) {
                this.onReadFailure(e, len);
                return -1;
            }
            catch (IOException e) {
                exception = e;
                this.onReadFailure(e, len);
                LOG.warn("read len[{}] of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{len, this.uri, retryTime, exception});
                if (retryTime >= 3) continue;
                try {
                    this.sleepInLock();
                    continue;
                }
                catch (InterruptedException ie) {
                    LOG.error("read len[{}] of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{len, this.uri, retryTime, exception});
                    throw exception;
                }
            }
        }
        if (exception != null) {
            LOG.error("read len[{}] of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{len, this.uri, 3, exception});
            throw exception;
        }
        if (bytesRead > 0) {
            this.streamCurrentPos += (long)bytesRead;
            this.nextReadPos += (long)bytesRead;
            byteBuffer.put(buf, 0, bytesRead);
        }
        this.incrementBytesRead(bytesRead);
        long endTime = System.currentTimeMillis();
        LOG.debug("Read-ByteBuffer uri:{}, contentLength:{}, destLen:{}, readLen:{}, position:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, this.contentLength, len, bytesRead, bytesRead >= 0 ? this.nextReadPos - (long)bytesRead : this.nextReadPos, threadId, endTime - startTime});
        return bytesRead;
    }

    private int tryToReadFromInputStream(InputStream in, byte[] buf, int off, int len) throws IOException {
        int bytesRead;
        int bytes;
        for (bytesRead = 0; bytesRead < len; bytesRead += bytes) {
            bytes = in.read(buf, off + bytesRead, len - bytesRead);
            if (bytes != -1) continue;
            if (bytesRead != 0) break;
            return -1;
        }
        return bytesRead;
    }

    public synchronized int read(@NotNull byte[] buf, int off, int len) throws IOException {
        long startTime = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        this.checkNotClosed();
        this.validatePositionedReadArgs(this.nextReadPos, buf, off, len);
        if (len == 0) {
            return 0;
        }
        if (this.contentLength == 0L || this.nextReadPos >= this.contentLength) {
            return -1;
        }
        try {
            this.lazySeek(this.nextReadPos, len);
        }
        catch (EOFException e) {
            this.onReadFailure(e, len);
            return -1;
        }
        int bytesRead = 0;
        IOException exception = null;
        for (int retryTime = 1; retryTime <= 3; ++retryTime) {
            try {
                bytesRead = this.tryToReadFromInputStream(this.wrappedStream, buf, off, len);
                if (bytesRead == -1) {
                    return -1;
                }
                exception = null;
                break;
            }
            catch (EOFException e) {
                this.onReadFailure(e, len);
                return -1;
            }
            catch (IOException e) {
                exception = e;
                this.onReadFailure(e, len);
                LOG.warn("read offset[{}] len[{}] of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{off, len, this.uri, retryTime, exception});
                if (retryTime >= 3) continue;
                try {
                    this.sleepInLock();
                    continue;
                }
                catch (InterruptedException ie) {
                    LOG.error("read offset[{}] len[{}] of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{off, len, this.uri, retryTime, exception});
                    throw exception;
                }
            }
        }
        if (exception != null) {
            LOG.error("read offset[{}] len[{}] of [{}] failed, retry time[{}], due to exception[{}]", new Object[]{off, len, this.uri, 3, exception});
            throw exception;
        }
        if (bytesRead > 0) {
            this.streamCurrentPos += (long)bytesRead;
            this.nextReadPos += (long)bytesRead;
        }
        this.incrementBytesRead(bytesRead);
        long endTime = System.currentTimeMillis();
        LOG.debug("Read-3args uri:{}, contentLength:{}, destLen:{}, readLen:{}, position:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, this.contentLength, len, bytesRead, bytesRead >= 0 ? this.nextReadPos - (long)bytesRead : this.nextReadPos, threadId, endTime - startTime});
        return bytesRead;
    }

    private void checkNotClosed() throws IOException {
        if (this.closed) {
            throw new IOException(this.uri + ": Stream is closed!");
        }
    }

    public synchronized void close() throws IOException {
        if (!this.closed) {
            this.closed = true;
            this.closeStream("close() operation", this.contentRangeFinish);
            super.close();
        }
    }

    private synchronized void closeStream(String reason, long length) throws IOException {
        if (this.wrappedStream != null) {
            try {
                this.wrappedStream.close();
            }
            catch (IOException e) {
                LOG.debug("When closing {} stream for {}", new Object[]{this.uri, reason, e});
                throw e;
            }
            LOG.debug("Stream {} : {}; streamPos={}, nextReadPos={}, request range {}-{} length={}", new Object[]{this.uri, reason, this.streamCurrentPos, this.nextReadPos, this.contentRangeStart, this.contentRangeFinish, length});
            this.wrappedStream = null;
        }
    }

    public synchronized int available() throws IOException {
        this.checkNotClosed();
        long remaining = this.remainingInFile();
        if (remaining > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        return (int)remaining;
    }

    @InterfaceAudience.Private
    @InterfaceStability.Unstable
    public synchronized long remainingInFile() {
        return this.contentLength - this.streamCurrentPos;
    }

    @InterfaceAudience.Private
    @InterfaceStability.Unstable
    public synchronized long remainingInCurrentRequest() {
        return this.contentRangeFinish - this.streamCurrentPos;
    }

    public boolean markSupported() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InterfaceStability.Unstable
    public String toString() {
        OBSInputStream oBSInputStream = this;
        synchronized (oBSInputStream) {
            return "OBSInputStream{" + this.uri + " wrappedStream=" + (this.wrappedStream != null ? "open" : "closed") + " streamCurrentPos=" + this.streamCurrentPos + " nextReadPos=" + this.nextReadPos + " contentLength=" + this.contentLength + " contentRangeStart=" + this.contentRangeStart + " contentRangeFinish=" + this.contentRangeFinish + " remainingInCurrentRequest=" + this.remainingInCurrentRequest() + "}";
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
        int nread;
        long startTime = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        this.checkNotClosed();
        this.validatePositionedReadArgs(position, buffer, offset, length);
        if (length == 0) {
            return;
        }
        OBSInputStream oBSInputStream = this;
        synchronized (oBSInputStream) {
            long oldPos = this.getPos();
            try {
                int nbytes;
                this.seek(position);
                for (nread = 0; nread < length; nread += nbytes) {
                    nbytes = this.read(buffer, offset + nread, length - nread);
                    if (nbytes >= 0) continue;
                    throw new EOFException("End of file reached before reading fully.");
                }
            }
            finally {
                this.seekQuietly(oldPos);
            }
        }
        long endTime = System.currentTimeMillis();
        LOG.debug("ReadFully uri:{}, contentLength:{}, destLen:{}, readLen:{}, position:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, this.contentLength, length, nread, position, threadId, endTime - startTime});
    }

    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
        int len = length;
        this.checkNotClosed();
        this.validatePositionedReadArgs(position, buffer, offset, len);
        if (position < 0L || position >= this.contentLength) {
            return -1;
        }
        if (position + (long)len > this.contentLength) {
            len = (int)(this.contentLength - position);
        }
        if (this.fs.isReadTransformEnabled()) {
            return super.read(position, buffer, offset, len);
        }
        return this.randomReadWithNewInputStream(position, buffer, offset, len);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int randomReadWithNewInputStream(long position, byte[] buffer, int offset, int length) throws IOException {
        long startTime = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        int bytesRead = 0;
        InputStream inputStream = null;
        IOException exception = null;
        GetObjectRequest request = new GetObjectRequest(this.bucket, this.key);
        request.setRangeStart(Long.valueOf(position));
        request.setRangeEnd(Long.valueOf(position + (long)length));
        if (this.fs.getSse().isSseCEnable()) {
            request.setSseCHeader(this.fs.getSse().getSseCHeader());
        }
        for (int retryTime = 1; retryTime <= 3; ++retryTime) {
            try {
                inputStream = this.client.getObject(request).getObjectContent();
                if (inputStream == null) break;
                bytesRead = this.tryToReadFromInputStream(inputStream, buffer, offset, length);
                if (bytesRead == -1) {
                    int n = -1;
                    return n;
                }
                exception = null;
                break;
            }
            catch (ObsException | IOException e) {
                exception = e instanceof ObsException ? OBSCommonUtils.translateException("Read at position " + position, this.uri, (ObsException)e) : (IOException)e;
                LOG.warn("read position[{}] destLen[{}] offset[{}] readLen[{}] of [{}] failed, retry time[{}], due to exception[{}] e[{}]", new Object[]{position, length, offset, bytesRead, this.uri, retryTime, exception, e});
                if (retryTime >= 3) continue;
                try {
                    Thread.sleep(10L);
                    continue;
                }
                catch (InterruptedException ie) {
                    LOG.error("read position[{}] destLen[{}] offset[{}] readLen[{}] of [{}] failed, retry time[{}], due to exception[{}] e[{}]", new Object[]{position, length, offset, bytesRead, this.uri, retryTime, exception, e});
                    throw exception;
                }
            }
            finally {
                if (inputStream != null) {
                    inputStream.close();
                }
            }
        }
        if (inputStream == null || exception != null) {
            LOG.error("read position[{}] destLen[{}] offset[{}] len[{}] failed, retry time[{}], due to exception[{}]", new Object[]{position, length, offset, bytesRead, 3, exception});
            throw new IOException("read failed of " + this.uri + ", inputStream is " + (inputStream == null ? "null" : "not null"), exception);
        }
        long endTime = System.currentTimeMillis();
        LOG.debug("Read-4args uri:{}, contentLength:{}, destLen:{}, readLen:{}, position:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, this.contentLength, length, bytesRead, position, threadId, endTime - startTime});
        return bytesRead;
    }

    public synchronized void setReadahead(Long newReadaheadRange) {
        if (newReadaheadRange == null) {
            this.readAheadRange = 0x100000L;
        } else {
            Preconditions.checkArgument((newReadaheadRange >= 0L ? 1 : 0) != 0, (Object)"Negative readahead value");
            this.readAheadRange = newReadaheadRange;
        }
    }
}

