/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.grpc.async;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.Clock;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.cloud.bigtable.config.Logger;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
import com.google.cloud.bigtable.grpc.async.OperationAccountant;
import com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class BulkMutation {
    private static final StatusRuntimeException MISSING_ENTRY_EXCEPTION = Status.UNKNOWN.withDescription("Mutation does not have a status").asRuntimeException();
    protected static final Logger LOG = new Logger(BulkMutation.class);
    public static final long MAX_RPC_WAIT_TIME = TimeUnit.MINUTES.toMillis(5L);
    @VisibleForTesting
    static Clock clock = Clock.SYSTEM;
    @VisibleForTesting
    Batch currentBatch = null;
    private ScheduledFuture<?> scheduledFlush = null;
    private final String tableName;
    private final AsyncExecutor asyncExecutor;
    private final RetryOptions retryOptions;
    private final ScheduledExecutorService retryExecutorService;
    private final int maxRowKeyCount;
    private final long maxRequestSize;
    private final long autoflushMs;
    private final Meter batchMeter = BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "bulk-mutator.batch.meter");

    private static StatusRuntimeException toException(com.google.rpc.Status status) {
        Status grpcStatus = Status.fromCodeValue((int)status.getCode()).withDescription(status.getMessage());
        for (Any detail : status.getDetailsList()) {
            grpcStatus = grpcStatus.augmentDescription(detail.toString());
        }
        return grpcStatus.asRuntimeException();
    }

    @VisibleForTesting
    static MutateRowsRequest.Entry convert(MutateRowRequest request) {
        if (request == null) {
            return null;
        }
        return MutateRowsRequest.Entry.newBuilder().setRowKey(request.getRowKey()).addAllMutations((Iterable)request.getMutationsList()).build();
    }

    public BulkMutation(BigtableTableName tableName, AsyncExecutor asyncExecutor, RetryOptions retryOptions, ScheduledExecutorService retryExecutorService, int maxRowKeyCount, long maxRequestSize) {
        this(tableName, asyncExecutor, retryOptions, retryExecutorService, maxRowKeyCount, maxRequestSize, 0L);
    }

    public BulkMutation(BigtableTableName tableName, AsyncExecutor asyncExecutor, RetryOptions retryOptions, ScheduledExecutorService retryExecutorService, int maxRowKeyCount, long maxRequestSize, long autoflushMs) {
        this.tableName = tableName.toString();
        this.asyncExecutor = asyncExecutor;
        this.retryOptions = retryOptions;
        this.retryExecutorService = retryExecutorService;
        this.maxRowKeyCount = maxRowKeyCount;
        this.maxRequestSize = maxRequestSize;
        this.autoflushMs = autoflushMs;
    }

    public ListenableFuture<MutateRowResponse> add(MutateRowRequest request) {
        return this.add(BulkMutation.convert(request));
    }

    public synchronized ListenableFuture<MutateRowResponse> add(MutateRowsRequest.Entry entry) {
        Preconditions.checkNotNull((Object)entry, (Object)"Request null");
        Preconditions.checkArgument((!entry.getRowKey().isEmpty() ? 1 : 0) != 0, (Object)"Request has an empty rowkey");
        if (this.currentBatch == null) {
            this.batchMeter.mark();
            this.currentBatch = new Batch();
        }
        ListenableFuture future = this.currentBatch.add(entry);
        if (this.currentBatch.isFull()) {
            this.flush();
        }
        if (this.autoflushMs > 0L && this.currentBatch != null && this.scheduledFlush == null) {
            this.scheduledFlush = this.retryExecutorService.schedule(new Runnable(){

                @Override
                public void run() {
                    BulkMutation.this.scheduledFlush = null;
                    BulkMutation.this.flush();
                }
            }, this.autoflushMs, TimeUnit.MILLISECONDS);
        }
        return future;
    }

    public synchronized void flush() {
        if (this.currentBatch != null) {
            this.currentBatch.run();
            this.currentBatch = null;
        }
    }

    public boolean isFlushed() {
        return this.currentBatch == null;
    }

    @VisibleForTesting
    class Batch
    implements Runnable {
        private final Meter mutationMeter = BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "bulk-mutator.mutations.added");
        private final Meter mutationRetryMeter = BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "bulk-mutator.mutations.retried");
        @VisibleForTesting
        Long retryId;
        private RequestManager currentRequestManager;
        private BackOff currentBackoff;
        private int failedCount;
        private ListenableFuture<List<MutateRowsResponse>> mutateRowsFuture;

        private Batch() {
            this.currentRequestManager = new RequestManager(BulkMutation.this.tableName, this.mutationMeter);
        }

        private ListenableFuture<MutateRowResponse> add(MutateRowsRequest.Entry entry) {
            Preconditions.checkNotNull((Object)entry);
            SettableFuture future = SettableFuture.create();
            this.currentRequestManager.add((SettableFuture<MutateRowResponse>)future, entry);
            return future;
        }

        private boolean isFull() {
            Preconditions.checkNotNull((Object)this.currentRequestManager);
            return this.getRequestCount() >= BulkMutation.this.maxRowKeyCount || this.currentRequestManager.approximateByteSize >= BulkMutation.this.maxRequestSize;
        }

        @VisibleForTesting
        void addCallback(ListenableFuture<List<MutateRowsResponse>> bulkFuture) {
            Futures.addCallback(bulkFuture, (FutureCallback)new FutureCallback<List<MutateRowsResponse>>(){

                public void onSuccess(List<MutateRowsResponse> result) {
                    Batch.this.mutateRowsFuture = null;
                    Batch.this.handleResult(result);
                }

                public void onFailure(Throwable t) {
                    Batch.this.mutateRowsFuture = null;
                    Batch.this.performFullRetry(new AtomicReference(), t);
                }
            });
        }

        private synchronized void handleResult(List<MutateRowsResponse> results) {
            AtomicReference<Long> backoffTime = new AtomicReference<Long>();
            try {
                if (this.operationsAreComplete()) {
                    LOG.warn("Got duplicate responses for bulk mutation.", new Object[0]);
                    this.setRetryComplete();
                    return;
                }
                if (results == null || results.isEmpty()) {
                    this.performFullRetry(backoffTime, new IllegalStateException("No MutateRowResponses were found."));
                    return;
                }
                ArrayList<MutateRowsResponse.Entry> entries = new ArrayList<MutateRowsResponse.Entry>();
                for (MutateRowsResponse response : results) {
                    entries.addAll(response.getEntriesList());
                }
                if (entries.isEmpty()) {
                    this.performFullRetry(backoffTime, new IllegalStateException("No MutateRowsResponses entries were found."));
                    return;
                }
                String tableName = this.currentRequestManager.request.getTableName();
                RequestManager retryRequestManager = new RequestManager(tableName, this.mutationRetryMeter);
                this.handleResponses(backoffTime, entries, retryRequestManager);
                this.handleExtraFutures(backoffTime, retryRequestManager, entries);
                this.completeOrRetry(backoffTime, retryRequestManager);
            }
            catch (Throwable e) {
                LOG.error("Unexpected Exception occurred. Treating this issue as a temporary issue and retrying.", e, new Object[0]);
                this.performFullRetry(backoffTime, e);
            }
        }

        private void performFullRetry(AtomicReference<Long> backoff, Throwable t) {
            if (this.currentRequestManager == null) {
                this.setRetryComplete();
                return;
            }
            Long backoffMs = this.getCurrentBackoff(backoff);
            ++this.failedCount;
            if (backoffMs == -1L) {
                this.setFailure(new BigtableRetriesExhaustedException("Batch #" + this.retryId + " Exhausted retries.", t));
            } else {
                LOG.info("Retrying failed call for batch #%d. Failure #%d, got: %s", t, this.retryId, this.failedCount, Status.fromThrowable((Throwable)t));
                this.mutationRetryMeter.mark(this.getRequestCount());
                BulkMutation.this.retryExecutorService.schedule(this, (long)backoffMs, TimeUnit.MILLISECONDS);
            }
        }

        private Long getCurrentBackoff(AtomicReference<Long> backOffTime) {
            if (backOffTime.get() == null) {
                try {
                    if (this.currentBackoff == null) {
                        this.currentBackoff = BulkMutation.this.retryOptions.createBackoff();
                    }
                    backOffTime.set(this.currentBackoff.nextBackOffMillis());
                }
                catch (IOException e) {
                    LOG.warn("Could not get the next backoff.", e, new Object[0]);
                    backOffTime.set(-1L);
                }
            }
            return backOffTime.get();
        }

        private void handleResponses(AtomicReference<Long> backoffTime, Iterable<MutateRowsResponse.Entry> entries, RequestManager retryRequestManager) {
            for (MutateRowsResponse.Entry entry : entries) {
                int index = (int)entry.getIndex();
                if (index >= this.getRequestCount()) {
                    LOG.error("Got extra status: %s", entry);
                    break;
                }
                SettableFuture future = (SettableFuture)this.currentRequestManager.futures.get(index);
                if (future == null) {
                    LOG.warn("Could not find a future for index %d.", index);
                    return;
                }
                com.google.rpc.Status status = entry.getStatus();
                int statusCode = status.getCode();
                if (statusCode == Status.Code.OK.value()) {
                    future.set((Object)MutateRowResponse.getDefaultInstance());
                    continue;
                }
                if (!this.isRetryable(statusCode) || this.getCurrentBackoff(backoffTime) == -1L) {
                    future.setException((Throwable)BulkMutation.toException(status));
                    continue;
                }
                retryRequestManager.add((SettableFuture<MutateRowResponse>)future, this.currentRequestManager.request.getEntries(index));
            }
        }

        private void handleExtraFutures(AtomicReference<Long> backoffTime, RequestManager retryRequestManager, List<MutateRowsResponse.Entry> entries) {
            Set<Integer> indexes = this.getIndexes(entries);
            long missingEntriesCount = 0L;
            this.getCurrentBackoff(backoffTime);
            for (int i = 0; i < this.getRequestCount(); ++i) {
                if (indexes.remove(i)) continue;
                ++missingEntriesCount;
                if (backoffTime.get() == -1L) {
                    ((SettableFuture)this.currentRequestManager.futures.get(i)).setException((Throwable)MISSING_ENTRY_EXCEPTION);
                    continue;
                }
                retryRequestManager.add((SettableFuture<MutateRowResponse>)((SettableFuture)this.currentRequestManager.futures.get(i)), this.currentRequestManager.request.getEntries(i));
            }
            if (missingEntriesCount > 0L) {
                String handling = backoffTime.get() == -1L ? "Setting exceptions on the futures" : "Retrying";
                LOG.error("Missing %d responses for bulkWrite. %s.", missingEntriesCount, handling);
            }
        }

        private Set<Integer> getIndexes(List<MutateRowsResponse.Entry> entries) {
            HashSet<Integer> indexes = new HashSet<Integer>(entries.size());
            for (MutateRowsResponse.Entry entry : entries) {
                indexes.add((int)entry.getIndex());
            }
            return indexes;
        }

        private void completeOrRetry(AtomicReference<Long> backoffTime, RequestManager retryRequestManager) {
            if (retryRequestManager == null || retryRequestManager.isEmpty()) {
                this.setRetryComplete();
            } else {
                this.currentRequestManager = retryRequestManager;
                ++this.failedCount;
                this.mutationRetryMeter.mark(this.getRequestCount());
                LOG.info("Retrying failed call. Failure #%d, got #%d failures", this.failedCount, this.getRequestCount());
                BulkMutation.this.retryExecutorService.schedule(this, (long)this.getCurrentBackoff(backoffTime), TimeUnit.MILLISECONDS);
            }
        }

        private boolean isRetryable(int codeId) {
            Status.Code code = Status.fromCodeValue((int)codeId).getCode();
            return BulkMutation.this.retryOptions.isRetryable(code);
        }

        @Override
        public synchronized void run() {
            if (this.operationsAreComplete()) {
                this.setRetryComplete();
                return;
            }
            try {
                if (this.retryId == null) {
                    this.retryId = BulkMutation.this.asyncExecutor.getOperationAccountant().registerComplexOperation(this.createRetryHandler());
                }
                this.mutateRowsFuture = BulkMutation.this.asyncExecutor.mutateRowsAsync(this.currentRequestManager.build());
                this.currentRequestManager.lastRpcSentTime = clock.currentTimeMillis();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.mutateRowsFuture = Futures.immediateFailedFuture((Throwable)e);
            }
            catch (Throwable e) {
                this.mutateRowsFuture = Futures.immediateFailedFuture((Throwable)e);
            }
            finally {
                this.addCallback(this.mutateRowsFuture);
            }
        }

        private OperationAccountant.ComplexOperationStalenessHandler createRetryHandler() {
            return new OperationAccountant.ComplexOperationStalenessHandler(){

                @Override
                public void performRetryIfStale() {
                    if (Batch.this.retryId == null || Batch.this.operationsAreComplete() || Batch.this.currentRequestManager.isStale()) {
                        Batch.this.setRetryComplete();
                    }
                }
            };
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setFailure(Throwable t) {
            try {
                if (this.currentRequestManager != null) {
                    for (SettableFuture future : this.currentRequestManager.futures) {
                        future.setException(t);
                    }
                }
            }
            finally {
                this.setRetryComplete();
            }
        }

        private synchronized void setRetryComplete() {
            if (this.retryId != null) {
                if (this.mutateRowsFuture != null) {
                    this.mutateRowsFuture.cancel(true);
                }
                this.mutateRowsFuture = null;
                BulkMutation.this.asyncExecutor.getOperationAccountant().onComplexOperationCompletion(this.retryId);
                if (this.failedCount > 0) {
                    LOG.info("Batch #%d recovered from the failure and completed.", this.retryId);
                }
                this.retryId = null;
            }
            this.currentRequestManager = null;
        }

        @VisibleForTesting
        int getRequestCount() {
            return this.currentRequestManager == null ? 0 : this.currentRequestManager.getRequestCount();
        }

        boolean operationsAreComplete() {
            return this.currentRequestManager == null || this.currentRequestManager.isEmpty();
        }
    }

    @VisibleForTesting
    static class RequestManager {
        private final List<SettableFuture<MutateRowResponse>> futures = new ArrayList<SettableFuture<MutateRowResponse>>();
        private final MutateRowsRequest.Builder builder;
        private final Meter addMeter;
        private MutateRowsRequest request;
        private long approximateByteSize = 0L;
        @VisibleForTesting
        Long lastRpcSentTime;

        RequestManager(String tableName, Meter addMeter) {
            this.builder = MutateRowsRequest.newBuilder().setTableName(tableName);
            this.approximateByteSize = tableName.length() + 2;
            this.addMeter = addMeter;
        }

        void add(SettableFuture<MutateRowResponse> future, MutateRowsRequest.Entry entry) {
            this.addMeter.mark();
            this.futures.add(future);
            this.builder.addEntries(entry);
            this.approximateByteSize += (long)entry.getSerializedSize();
        }

        MutateRowsRequest build() {
            this.request = this.builder.build();
            return this.request;
        }

        public boolean isEmpty() {
            return this.futures.isEmpty();
        }

        public int getRequestCount() {
            return this.futures.size();
        }

        public boolean isStale() {
            return this.lastRpcSentTime != null && this.lastRpcSentTime < clock.currentTimeMillis() - MAX_RPC_WAIT_TIME;
        }

        public boolean wasSent() {
            return this.lastRpcSentTime != null;
        }
    }
}

