/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage;

import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.core.ApiFuture;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.core.ApiFutures;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.core.SettableApiFuture;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.grpc.GrpcCallContext;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.grpc.GrpcStatusCode;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.rpc.ApiException;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.rpc.ClientStream;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.api.gax.rpc.StreamController;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.ApiFutureUtils;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.AsyncSessionClosedException;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.Crc32cValue;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.GrpcUtils;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.Hasher;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.IOAutoCloseable;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.MaxRedirectsExceededException;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.ObjectReadSessionState;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.ObjectReadSessionStreamRead;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.ResponseContentLifecycleHandle;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.RetryContext;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.StorageDataClient;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.cloud.storage.StorageException;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.common.base.Preconditions;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.protobuf.ByteString;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.rpc.Status;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.BidiReadObjectError;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.BidiReadObjectRedirectedError;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.BidiReadObjectRequest;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.BidiReadObjectResponse;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.ChecksummedData;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.ObjectRangeData;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.ReadRange;
import com.google.cloud.hadoop.repackaged.ossgcs.com.google.storage.v2.ReadRangeError;
import com.google.cloud.hadoop.repackaged.ossgcs.io.grpc.Status;
import com.google.cloud.hadoop.repackaged.ossgcs.io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.Nullable;

final class ObjectReadSessionStream
implements ClientStream<BidiReadObjectRequest>,
ApiFuture<Void>,
IOAutoCloseable,
StorageDataClient.Borrowable {
    private final SettableApiFuture<Void> objectReadSessionResolveFuture;
    private final ObjectReadSessionState state;
    private final ScheduledExecutorService executor;
    private final GrpcUtils.ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable;
    private final RetryContext streamRetryContext;
    private final int maxRedirectsAllowed;
    private final AtomicInteger openLeases;
    private volatile MonitoringResponseObserver monitoringResponseObserver;
    private volatile ResponseObserver<BidiReadObjectResponse> responseObserver;
    private volatile ClientStream<BidiReadObjectRequest> requestStream;
    private volatile StreamController controller;
    private final AtomicInteger redirectCounter;

    private ObjectReadSessionStream(ObjectReadSessionState state, ScheduledExecutorService executor, GrpcUtils.ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable, int maxRedirectsAllowed, RetryContext backoff) {
        this.state = state;
        this.executor = executor;
        this.callable = callable;
        this.streamRetryContext = backoff;
        this.objectReadSessionResolveFuture = SettableApiFuture.create();
        this.maxRedirectsAllowed = maxRedirectsAllowed;
        this.openLeases = new AtomicInteger(1);
        this.redirectCounter = new AtomicInteger();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClientStream<BidiReadObjectRequest> getRequestStream(@Nullable GrpcCallContext context) {
        if (this.requestStream != null) {
            return this.requestStream;
        }
        ObjectReadSessionStream objectReadSessionStream = this;
        synchronized (objectReadSessionStream) {
            if (this.requestStream == null) {
                this.monitoringResponseObserver = new MonitoringResponseObserver(new BidiReadObjectResponseObserver());
                this.responseObserver = GrpcUtils.decorateAsStateChecking(new RedirectHandlingResponseObserver(this.monitoringResponseObserver));
                this.requestStream = this.callable.splitCall(this.responseObserver, context);
            }
            return this.requestStream;
        }
    }

    @Override
    public void close() {
        ApiFuture<Void> closeAsync = this.closeAsync();
        ApiFutureUtils.await(closeAsync);
    }

    public ApiFuture<Void> closeAsync() {
        if (!this.isOpen()) {
            return ApiFutures.immediateFuture(null);
        }
        int updatedLeaseCount = this.openLeases.decrementAndGet();
        if (updatedLeaseCount == 0) {
            AsyncSessionClosedException cause = new AsyncSessionClosedException("Session already closed");
            ApiFuture<?> f = this.failAll(() -> new StorageException(0, "Parent stream shutdown", cause));
            return ApiFutures.transformAsync(f, ignore -> ApiFutures.immediateFuture(null), this.executor);
        }
        return ApiFutures.immediateFuture(null);
    }

    private void cleanUp() {
        this.cancel(true);
        if (this.requestStream != null) {
            this.requestStream.closeSend();
            ApiFutureUtils.await(this.monitoringResponseObserver.closeSignal);
            this.requestStream = null;
        }
    }

    @Override
    public void send(BidiReadObjectRequest request) {
        this.checkOpen();
        if (this.requestStream == null) {
            ObjectReadSessionState.OpenArguments openArguments = this.state.getOpenArguments();
            BidiReadObjectRequest merged = openArguments.getReq().toBuilder().clearReadRanges().mergeFrom(request).build();
            this.getRequestStream(openArguments.getCtx()).send(merged);
        } else {
            this.getRequestStream(null).send(request);
        }
    }

    @Override
    public void closeSendWithError(Throwable t) {
        this.checkOpen();
        this.getRequestStream(null).closeSendWithError(t);
    }

    @Override
    public void closeSend() {
        this.checkOpen();
        this.getRequestStream(null).closeSend();
    }

    @Override
    public boolean isSendReady() {
        this.checkOpen();
        return this.getRequestStream(null).isSendReady();
    }

    @Override
    public void addListener(Runnable listener, Executor executor) {
        this.objectReadSessionResolveFuture.addListener(listener, executor);
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return this.objectReadSessionResolveFuture.cancel(mayInterruptIfRunning);
    }

    @Override
    public Void get() throws InterruptedException, ExecutionException {
        return (Void)this.objectReadSessionResolveFuture.get();
    }

    @Override
    public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return (Void)this.objectReadSessionResolveFuture.get(timeout, unit);
    }

    @Override
    public boolean isCancelled() {
        return this.objectReadSessionResolveFuture.isCancelled();
    }

    @Override
    public boolean isDone() {
        return this.objectReadSessionResolveFuture.isDone();
    }

    boolean isOpen() {
        return this.openLeases.get() > 0;
    }

    @Override
    public void borrow() {
        this.checkOpen();
        this.openLeases.incrementAndGet();
    }

    private void checkOpen() {
        Preconditions.checkState(this.isOpen(), "Stream closed");
    }

    @VisibleForTesting
    void restart() {
        Preconditions.checkState(this.requestStream == null, "attempting to restart stream when stream is already active");
        ObjectReadSessionState.OpenArguments openArguments = this.state.getOpenArguments();
        BidiReadObjectRequest req = openArguments.getReq();
        if (!req.getReadRangesList().isEmpty() || !this.objectReadSessionResolveFuture.isDone()) {
            ClientStream<BidiReadObjectRequest> requestStream1 = this.getRequestStream(openArguments.getCtx());
            requestStream1.send(req);
        }
    }

    private void failAll(Throwable terminalFailure) {
        this.openLeases.set(0);
        try {
            this.objectReadSessionResolveFuture.setException(terminalFailure);
            this.state.failAll(this.executor, () -> terminalFailure);
        }
        finally {
            this.cleanUp();
        }
    }

    private ApiFuture<?> failAll(Supplier<Throwable> terminalFailure) {
        this.openLeases.set(0);
        try {
            this.objectReadSessionResolveFuture.setException(terminalFailure.get());
            ApiFuture<?> apiFuture = this.state.failAll(this.executor, terminalFailure);
            return apiFuture;
        }
        finally {
            this.cleanUp();
        }
    }

    static ObjectReadSessionStream create(ScheduledExecutorService executor, GrpcUtils.ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable, ObjectReadSessionState state, RetryContext retryContext) {
        int maxRedirectsAllowed = 3;
        return new ObjectReadSessionStream(state, executor, callable, maxRedirectsAllowed, retryContext);
    }

    private class MonitoringResponseObserver
    implements ResponseObserver<BidiReadObjectResponse> {
        private final ResponseObserver<BidiReadObjectResponse> delegate;
        private final SettableApiFuture<Void> openSignal;
        private final SettableApiFuture<Void> closeSignal;

        private MonitoringResponseObserver(ResponseObserver<BidiReadObjectResponse> delegate) {
            this.delegate = delegate;
            this.openSignal = SettableApiFuture.create();
            this.closeSignal = SettableApiFuture.create();
        }

        @Override
        public void onStart(StreamController controller) {
            this.delegate.onStart(controller);
        }

        @Override
        public void onResponse(BidiReadObjectResponse response) {
            this.delegate.onResponse(response);
            this.openSignal.set(null);
            ObjectReadSessionStream.this.objectReadSessionResolveFuture.set(null);
        }

        @Override
        public void onError(Throwable t) {
            this.delegate.onError(t);
            this.openSignal.setException(t);
            this.closeSignal.setException(t);
        }

        @Override
        public void onComplete() {
            this.delegate.onComplete();
            if (ObjectReadSessionStream.this.state.getMetadata() == null) {
                StatusRuntimeException cause = Status.Code.UNAVAILABLE.toStatus().withDescription("onComplete without prior onNext").asRuntimeException();
                ApiException apiException = ApiExceptionFactory.createException(cause, GrpcStatusCode.of(Status.Code.UNAVAILABLE), false);
                StorageException storageException = new StorageException(0, cause.getMessage(), apiException);
                ObjectReadSessionStream.this.streamRetryContext.recordError(storageException, ObjectReadSessionStream.this::restart, ObjectReadSessionStream.this.objectReadSessionResolveFuture::setException);
            }
            this.openSignal.set(null);
            this.closeSignal.set(null);
        }
    }

    private final class BidiReadObjectResponseObserver
    implements ResponseObserver<BidiReadObjectResponse> {
        private BidiReadObjectResponseObserver() {
        }

        @Override
        public void onStart(StreamController controller) {
            ObjectReadSessionStream.this.controller = controller;
            controller.disableAutoInboundFlowControl();
            controller.request(2);
        }

        @Override
        public void onResponse(BidiReadObjectResponse response) {
            ObjectReadSessionStream.this.controller.request(1);
            try (ResponseContentLifecycleHandle<BidiReadObjectResponse> handle = ObjectReadSessionStream.this.callable.getResponseContentLifecycleManager().get(response);){
                List<ObjectRangeData> rangeData;
                if (response.hasMetadata()) {
                    ObjectReadSessionStream.this.state.setMetadata(response.getMetadata());
                }
                if (response.hasReadHandle()) {
                    ObjectReadSessionStream.this.state.setBidiReadHandle(response.getReadHandle());
                }
                if ((rangeData = response.getObjectDataRangesList()).isEmpty()) {
                    return;
                }
                for (int i = 0; i < rangeData.size(); ++i) {
                    ObjectRangeData d = rangeData.get(i);
                    ReadRange readRange = d.getReadRange();
                    long id = readRange.getReadId();
                    ObjectReadSessionStreamRead<?> read = ObjectReadSessionStream.this.state.getOutstandingRead(id);
                    if (read == null || !read.acceptingBytes()) continue;
                    ChecksummedData checksummedData = d.getChecksummedData();
                    ByteString content = checksummedData.getContent();
                    int crc32C = checksummedData.getCrc32C();
                    try {
                        read.hasher().validateUnchecked(Crc32cValue.of(crc32C), content);
                    }
                    catch (Hasher.UncheckedChecksumMismatchException e) {
                        read.recordError(e, this.restartReadFromCurrentOffset(id), ObjectReadSessionStream.this.state.removeOutstandingReadOnFailure(id, read::fail));
                        continue;
                    }
                    int idx = i;
                    long begin = readRange.getReadOffset();
                    long position = read.readOffset();
                    if (begin != position) {
                        if (begin < position) {
                            int skip = Math.toIntExact(position - begin);
                            ResponseContentLifecycleHandle.ChildRef childRef = handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent().substring(skip));
                            read.accept(childRef);
                            ApiException apiException = ApiExceptionFactory.createException(String.format("position = %d, readRange.read_offset = %d", position, begin), null, GrpcStatusCode.of(Status.Code.OUT_OF_RANGE), true);
                            read.recordError(apiException, this.restartReadFromCurrentOffset(id), ObjectReadSessionStream.this.state.removeOutstandingReadOnFailure(id, read::fail));
                            continue;
                        }
                        ApiException apiException = ApiExceptionFactory.createException(String.format("position = %d, readRange.read_offset = %d", position, begin), null, GrpcStatusCode.of(Status.Code.OUT_OF_RANGE), true);
                        read.recordError(apiException, this.restartReadFromCurrentOffset(id), ObjectReadSessionStream.this.state.removeOutstandingReadOnFailure(id, read::fail));
                        continue;
                    }
                    ResponseContentLifecycleHandle.ChildRef childRef = handle.borrow(r -> r.getObjectDataRanges(idx).getChecksummedData().getContent());
                    read.accept(childRef);
                    if (!d.getRangeEnd()) continue;
                    ObjectReadSessionStream.this.executor.execute(StorageException.liftToRunnable(() -> {
                        read.eof();
                        ObjectReadSessionStream.this.state.removeOutstandingRead(id);
                    }));
                }
            }
            catch (IOException e) {
                ObjectReadSessionStream.this.requestStream = null;
                ObjectReadSessionStream.this.streamRetryContext.recordError(e, ObjectReadSessionStream.this::restart, x$0 -> ObjectReadSessionStream.this.failAll(x$0));
            }
        }

        @Override
        public void onError(Throwable t) {
            ObjectReadSessionStream.this.requestStream = null;
            BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
            if (error == null) {
                ObjectReadSessionStream.this.streamRetryContext.recordError(t, ObjectReadSessionStream.this::restart, x$0 -> ObjectReadSessionStream.this.failAll(x$0));
                return;
            }
            List<ReadRangeError> rangeErrors = error.getReadRangeErrorsList();
            if (rangeErrors.isEmpty()) {
                ObjectReadSessionStream.this.streamRetryContext.recordError(t, ObjectReadSessionStream.this::restart, x$0 -> ObjectReadSessionStream.this.failAll(x$0));
                return;
            }
            for (ReadRangeError rangeError : rangeErrors) {
                Status status = rangeError.getStatus();
                long id = rangeError.getReadId();
                ObjectReadSessionStreamRead<?> read = ObjectReadSessionStream.this.state.getOutstandingRead(id);
                if (read == null) continue;
                read.preFail();
                ObjectReadSessionStream.this.executor.execute(StorageException.liftToRunnable(() -> ObjectReadSessionStream.this.state.removeOutstandingReadOnFailure(id, read::fail).onFailure(GrpcUtils.statusToApiException(status))));
            }
            ApiException apiException = ApiExceptionFactory.createException("Stream error, reclassifying as ABORTED for reads not specified in BidiReadObjectError", t, GrpcStatusCode.of(Status.Code.ABORTED), true);
            ObjectReadSessionStream.this.streamRetryContext.recordError(apiException, ObjectReadSessionStream.this::restart, x$0 -> ObjectReadSessionStream.this.failAll(x$0));
        }

        private RetryContext.OnSuccess restartReadFromCurrentOffset(long id) {
            return () -> {
                ObjectReadSessionStreamRead<?> readWithNewId = ObjectReadSessionStream.this.state.assignNewReadId(id);
                BidiReadObjectRequest requestWithNewReadId = BidiReadObjectRequest.newBuilder().addReadRanges(readWithNewId.makeReadRange()).build();
                ObjectReadSessionStream.this.send(requestWithNewReadId);
            };
        }

        @Override
        public void onComplete() {
        }
    }

    private final class RedirectHandlingResponseObserver
    implements ResponseObserver<BidiReadObjectResponse> {
        private final ResponseObserver<BidiReadObjectResponse> delegate;

        private RedirectHandlingResponseObserver(ResponseObserver<BidiReadObjectResponse> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void onStart(StreamController controller) {
            this.delegate.onStart(controller);
        }

        @Override
        public void onResponse(BidiReadObjectResponse response) {
            ObjectReadSessionStream.this.redirectCounter.set(0);
            this.delegate.onResponse(response);
        }

        @Override
        public void onError(Throwable t) {
            BidiReadObjectRedirectedError error = GrpcUtils.getBidiReadObjectRedirectedError(t);
            if (error == null) {
                this.delegate.onError(t);
                return;
            }
            ObjectReadSessionStream.this.requestStream = null;
            int redirectCount = ObjectReadSessionStream.this.redirectCounter.incrementAndGet();
            if (redirectCount > ObjectReadSessionStream.this.maxRedirectsAllowed) {
                t.addSuppressed(new MaxRedirectsExceededException(ObjectReadSessionStream.this.maxRedirectsAllowed, redirectCount));
                this.delegate.onError(t);
                ObjectReadSessionStream.this.objectReadSessionResolveFuture.setException(t);
                return;
            }
            if (error.hasReadHandle()) {
                ObjectReadSessionStream.this.state.setBidiReadHandle(error.getReadHandle());
            }
            if (error.hasRoutingToken()) {
                ObjectReadSessionStream.this.state.setRoutingToken(error.getRoutingToken());
            }
            ObjectReadSessionStream.this.executor.execute(ObjectReadSessionStream.this::restart);
        }

        @Override
        public void onComplete() {
            this.delegate.onComplete();
        }
    }
}

