/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.repackaged.com.google.cloud.bigtable.grpc.async;

import com.google.api.client.util.BackOff;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.MutateRowsRequest;
import com.google.bigtable.v1.MutateRowsResponse;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.dataflow.sdk.repackaged.com.google.cloud.bigtable.config.Logger;
import com.google.cloud.dataflow.sdk.repackaged.com.google.cloud.bigtable.grpc.async.AsyncExecutor;
import com.google.cloud.dataflow.sdk.repackaged.com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.FutureCallback;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Futures;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.rpc.Status;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class BulkMutation {
    private static final StatusRuntimeException MISSING_ENTRY_EXCEPTION = io.grpc.Status.UNKNOWN.withDescription("Mutation does not have a status").asRuntimeException();
    protected static final Logger LOG = new Logger(BulkMutation.class);
    @VisibleForTesting
    Batch currentBatch = null;
    private final String tableName;
    private final AsyncExecutor asyncExecutor;
    private final RetryOptions retryOptions;
    private final ScheduledExecutorService retryExecutorService;
    private final int maxRowKeyCount;
    private final long maxRequestSize;

    private static StatusRuntimeException toException(Status status) {
        io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue((int)status.getCode()).withDescription(status.getMessage());
        for (Any detail : status.getDetailsList()) {
            grpcStatus.augmentDescription(detail.toString());
        }
        return grpcStatus.asRuntimeException();
    }

    @VisibleForTesting
    static MutateRowsRequest.Entry convert(MutateRowRequest request) {
        return MutateRowsRequest.Entry.newBuilder().setRowKey(request.getRowKey()).addAllMutations((Iterable)request.getMutationsList()).build();
    }

    public BulkMutation(BigtableTableName tableName, AsyncExecutor asyncExecutor, RetryOptions retryOptions, ScheduledExecutorService retryExecutorService, int maxRowKeyCount, long maxRequestSize) {
        this.tableName = tableName.toString();
        this.asyncExecutor = asyncExecutor;
        this.retryOptions = retryOptions;
        this.retryExecutorService = retryExecutorService;
        this.maxRowKeyCount = maxRowKeyCount;
        this.maxRequestSize = maxRequestSize;
    }

    public synchronized ListenableFuture<Empty> add(MutateRowRequest request) {
        if (this.currentBatch == null) {
            this.currentBatch = new Batch(this.tableName, this.asyncExecutor, this.retryOptions, this.retryExecutorService, this.maxRowKeyCount, this.maxRequestSize);
        }
        ListenableFuture<Empty> future = this.currentBatch.add(request);
        if (this.currentBatch.isFull()) {
            this.currentBatch.run();
            this.currentBatch = null;
        }
        return future;
    }

    public synchronized void flush() {
        if (this.currentBatch != null) {
            this.currentBatch.run();
            this.currentBatch = null;
        }
    }

    public synchronized boolean isFlushed() {
        return this.currentBatch == null;
    }

    @VisibleForTesting
    static class Batch
    implements Runnable {
        private final AsyncExecutor asyncExecutor;
        private final RetryOptions retryOptions;
        private final ScheduledExecutorService retryExecutorService;
        private final int maxRowKeyCount;
        private final long maxRequestSize;
        private RequestManager currentRequestManager;
        private Long retryId;
        private BackOff currentBackoff;
        private int failedCount;

        Batch(String tableName, AsyncExecutor asyncExecutor, RetryOptions retryOptions, ScheduledExecutorService retryExecutorService, int maxRowKeyCount, long maxRequestSize) {
            this.currentRequestManager = new RequestManager(tableName);
            this.asyncExecutor = asyncExecutor;
            this.retryOptions = retryOptions;
            this.retryExecutorService = retryExecutorService;
            this.maxRowKeyCount = maxRowKeyCount;
            this.maxRequestSize = maxRequestSize;
        }

        ListenableFuture<Empty> add(MutateRowRequest request) {
            SettableFuture<Empty> future = SettableFuture.create();
            this.currentRequestManager.add(future, BulkMutation.convert(request));
            return future;
        }

        boolean isFull() {
            return this.getRequestCount() >= this.maxRowKeyCount || this.currentRequestManager.approximateByteSize >= this.maxRequestSize;
        }

        void addCallback(ListenableFuture<MutateRowsResponse> bulkFuture) {
            Futures.addCallback(bulkFuture, new FutureCallback<MutateRowsResponse>(){

                @Override
                public void onSuccess(MutateRowsResponse result) {
                    Batch.this.handleResult(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    Batch.this.perfromFullRetry(new AtomicReference(), t);
                }
            });
        }

        synchronized void handleResult(MutateRowsResponse result) {
            AtomicReference<Long> backoffTime = new AtomicReference<Long>();
            if (this.currentRequestManager == null) {
                LOG.warn("Got duplicate responses for bulk mutation.", new Object[0]);
                return;
            }
            if (result == null || result.getStatusesList() == null || result.getStatusesList().isEmpty()) {
                this.perfromFullRetry(backoffTime, new IllegalStateException("empty result or statuses"));
                return;
            }
            try {
                Iterator<Status> statuses = result.getStatusesList().iterator();
                Iterator<SettableFuture<Empty>> futures = this.currentRequestManager.futures.iterator();
                String tableName = this.currentRequestManager.request.getTableName();
                RequestManager retryRequestManager = new RequestManager(tableName);
                int processedCount = this.handleResponses(backoffTime, statuses, futures, retryRequestManager);
                this.handleExtraFutures(backoffTime, futures, processedCount, retryRequestManager);
                this.handleExtraStatuses(statuses);
                this.completeOrRetry(backoffTime, retryRequestManager);
            }
            catch (RuntimeException e) {
                LOG.error("Unexpected Exception occurred. Treating this issue as a temporary issue and retrying.", e, new Object[0]);
                this.perfromFullRetry(backoffTime, e);
            }
        }

        private void perfromFullRetry(AtomicReference<Long> backOffTime, Throwable t) {
            this.getCurrentBackoff(backOffTime);
            if (backOffTime.get() == -1L) {
                this.setFailure(new BigtableRetriesExhaustedException("Exhausted retries.", t));
            } else {
                LOG.info("Retrying failed call. Failure #%d, got: %s", t, this.failedCount++, io.grpc.Status.fromThrowable((Throwable)t));
                this.retryExecutorService.schedule(this, (long)backOffTime.get(), TimeUnit.MILLISECONDS);
            }
        }

        private Long getCurrentBackoff(AtomicReference<Long> backOffTime) {
            if (backOffTime.get() == null) {
                try {
                    if (this.currentBackoff == null) {
                        this.currentBackoff = this.retryOptions.createBackoff();
                    }
                    backOffTime.set(this.currentBackoff.nextBackOffMillis());
                }
                catch (IOException e) {
                    LOG.warn("Could not get the next backoff.", e, new Object[0]);
                    backOffTime.set(-1L);
                }
            }
            return backOffTime.get();
        }

        private int handleResponses(AtomicReference<Long> backoffTime, Iterator<Status> statuses, Iterator<SettableFuture<Empty>> futures, RequestManager retryRequestManager) {
            int count = 0;
            while (futures.hasNext() && statuses.hasNext()) {
                SettableFuture<Empty> future = futures.next();
                Status status = statuses.next();
                if (status.getCode() == Status.Code.OK.value()) {
                    future.set(Empty.getDefaultInstance());
                } else if (!this.isRetryable(status) || this.getCurrentBackoff(backoffTime) == -1L) {
                    future.setException(BulkMutation.toException(status));
                } else {
                    retryRequestManager.add(future, this.currentRequestManager.request.getEntries(count));
                }
                ++count;
            }
            return count;
        }

        private void handleExtraFutures(AtomicReference<Long> backoffTime, Iterator<SettableFuture<Empty>> futures, int processedCount, RequestManager retryRequestManager) {
            if (!futures.hasNext()) {
                return;
            }
            long missingEntriesCount = 0L;
            this.getCurrentBackoff(backoffTime);
            while (futures.hasNext()) {
                if (backoffTime.get() == -1L) {
                    futures.next().setException(MISSING_ENTRY_EXCEPTION);
                } else {
                    retryRequestManager.add(futures.next(), this.currentRequestManager.request.getEntries(processedCount));
                }
                ++processedCount;
                ++missingEntriesCount;
            }
            String handling = backoffTime.get() == -1L ? "Setting exceptions on the futures" : "Retrying";
            LOG.error("Missing %d responses for bulkWrite. %s.", missingEntriesCount, handling);
        }

        private void handleExtraStatuses(Iterator<Status> statuses) {
            if (!statuses.hasNext()) {
                return;
            }
            int extraStatusCount = 0;
            while (statuses.hasNext()) {
                ++extraStatusCount;
                LOG.error("Got %extra status: %s", statuses.next());
            }
            LOG.error("Got %d extra statuses", extraStatusCount);
        }

        private void completeOrRetry(AtomicReference<Long> backoffTime, RequestManager retryRequestManager) {
            if (retryRequestManager == null || retryRequestManager.futures.isEmpty()) {
                this.currentRequestManager = null;
                this.setRetryComplete();
            } else {
                this.currentRequestManager = retryRequestManager;
                LOG.info("Retrying failed call. Failure #%d, got #%d failures", this.failedCount++, this.currentRequestManager.futures.size());
                this.retryExecutorService.schedule(this, (long)this.getCurrentBackoff(backoffTime), TimeUnit.MILLISECONDS);
            }
        }

        private boolean isRetryable(Status status) {
            int codeId = status.getCode();
            Status.Code code = io.grpc.Status.fromCodeValue((int)codeId).getCode();
            return this.retryOptions.isRetryable(code);
        }

        @Override
        public synchronized void run() {
            ListenableFuture<MutateRowsResponse> future = null;
            try {
                if (this.retryId == null) {
                    this.retryId = this.asyncExecutor.getRpcThrottler().registerRetry();
                }
                future = this.asyncExecutor.mutateRowsAsync(this.currentRequestManager.build());
            }
            catch (InterruptedException e) {
                future = Futures.immediateFailedFuture(e);
            }
            finally {
                this.addCallback(future);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setFailure(Throwable t) {
            try {
                for (SettableFuture future : this.currentRequestManager.futures) {
                    future.setException(t);
                }
            }
            finally {
                this.setRetryComplete();
            }
        }

        private void setRetryComplete() {
            this.asyncExecutor.getRpcThrottler().onRetryCompletion(this.retryId);
        }

        @VisibleForTesting
        int getRequestCount() {
            return this.currentRequestManager == null ? 0 : this.currentRequestManager.futures.size();
        }
    }

    @VisibleForTesting
    static class RequestManager {
        private final List<SettableFuture<Empty>> futures = new ArrayList<SettableFuture<Empty>>();
        private final MutateRowsRequest.Builder builder;
        private MutateRowsRequest request;
        private long approximateByteSize = 0L;

        RequestManager(String tableName) {
            this.builder = MutateRowsRequest.newBuilder().setTableName(tableName);
            this.approximateByteSize = tableName.length() + 2;
        }

        void add(SettableFuture<Empty> future, MutateRowsRequest.Entry entry) {
            this.futures.add(future);
            this.builder.addEntries(entry);
            this.approximateByteSize += (long)entry.getSerializedSize();
        }

        MutateRowsRequest build() {
            this.request = this.builder.build();
            return this.request;
        }
    }
}

