/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.jdbc;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.ParallelPhoenixContext;
import org.apache.phoenix.jdbc.PhoenixHAGroupMetrics;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.Metric;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelPhoenixUtil {
    public static final String PHOENIX_HA_PARALLEL_OPERATION_TIMEOUT_ATTRIB = "phoenix.ha.parallel.operation.timeout.ms";
    public static ParallelPhoenixUtil INSTANCE = new ParallelPhoenixUtil();
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelPhoenixUtil.class);
    private static final long DEFAULT_INTERNAL_OPERATION_TIMEOUT_MS = 1000L;

    private ParallelPhoenixUtil() {
    }

    public Object getAnyOfNonExceptionally(List<CompletableFuture<? extends Object>> futures, ParallelPhoenixContext context) throws SQLException {
        long timeoutMs = context.getOperationTimeout();
        long endTime = timeoutMs == 0L ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime() + timeoutMs;
        long internalTimeoutMs = timeoutMs == 0L ? 1000L : Math.min(1000L, timeoutMs);
        Object result = null;
        boolean changed = true;
        boolean timedout = false;
        List<CompletableFuture<? extends Object>> originalFutures = futures;
        CompletableFuture<Object> resultFuture = null;
        while (!futures.isEmpty() && result == null) {
            if (EnvironmentEdgeManager.currentTime() > endTime) {
                timedout = true;
                break;
            }
            if (changed) {
                resultFuture = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
            }
            try {
                result = resultFuture.get(internalTimeoutMs, TimeUnit.MILLISECONDS);
                break;
            }
            catch (Exception e) {
                List filteredResults = futures.stream().filter(f -> !f.isCompletedExceptionally()).collect(Collectors.toList());
                if (filteredResults.equals(futures)) {
                    changed = false;
                    continue;
                }
                futures = filteredResults;
                changed = true;
            }
        }
        if (futures.isEmpty()) {
            LOGGER.error("All Futures failed.");
            Throwable futuresException = null;
            int i = 0;
            for (CompletableFuture<? extends Object> failedFuture : originalFutures) {
                try {
                    failedFuture.get();
                }
                catch (Exception e) {
                    LOGGER.error("Future Exception. Cluster " + i + " HAGroup:" + context.getHaGroup(), (Throwable)e);
                    if (futuresException == null) {
                        futuresException = new SQLException("All futures failed. HAGroup:" + context.getHaGroup(), e);
                    }
                    futuresException.addSuppressed(e);
                }
                ++i;
            }
            context.setError();
            throw futuresException;
        }
        if (timedout) {
            GlobalClientMetrics.GLOBAL_HA_PARALLEL_TASK_TIMEOUT_COUNTER.increment();
            if (futures.isEmpty()) {
                LOGGER.warn("Unexpected race between timeout and failure occurred.");
            } else {
                int i = 0;
                LOGGER.error("Parallel Phoenix Timeout occurred");
                for (CompletableFuture<? extends Object> future : originalFutures) {
                    if (future.isCompletedExceptionally()) {
                        try {
                            future.get();
                        }
                        catch (Exception e) {
                            LOGGER.info("For timeout cluster " + i + " completed exceptionally. HAGroup:" + context.getHaGroup(), (Throwable)e);
                        }
                    } else if (future.isDone()) {
                        LOGGER.info("For timeout cluster " + i + " finished post timeout prior to recording. HAGroup:" + context.getHaGroup());
                    } else {
                        LOGGER.info("For timeout cluster " + i + " still running. HAGroup:" + context.getHaGroup());
                    }
                    ++i;
                }
            }
            context.setError();
            throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage("Operation timedout. Operation timeout ms = " + timeoutMs).build().buildException();
        }
        int i = 0;
        for (CompletableFuture<? extends Object> failedFuture : originalFutures) {
            block25: {
                if (failedFuture.isCompletedExceptionally()) {
                    try {
                        failedFuture.get();
                    }
                    catch (Exception e) {
                        if (!LOGGER.isDebugEnabled()) break block25;
                        LOGGER.debug("Future Exception. Cluster " + i + "HAGroup:" + context.getHaGroup(), (Throwable)e);
                    }
                }
            }
            ++i;
        }
        return result;
    }

    public <T> T getFutureNoRetry(CompletableFuture<T> future, ParallelPhoenixContext context) throws InterruptedException, ExecutionException, TimeoutException {
        long operationTimeoutMs = context.getOperationTimeout();
        long timeout = operationTimeoutMs > 0L ? operationTimeoutMs : Long.MAX_VALUE;
        return future.get(timeout, TimeUnit.MILLISECONDS);
    }

    <T, R> CompletableFuture<T> getFutureAndChainOnContextNoMetrics(Function<R, T> functionToApply, CompletableFuture<R> future1, Function<Supplier<T>, CompletableFuture<T>> chainOnConn) {
        return this.getFutureAndChainOnContext(functionToApply, future1, chainOnConn, null, null, false);
    }

    <T, R> CompletableFuture<T> getFutureAndChainOnContext(Function<R, T> functionToApply, CompletableFuture<R> future1, Function<Supplier<T>, CompletableFuture<T>> chainOnConn, Metric operationCount, Metric failureCount) {
        return this.getFutureAndChainOnContext(functionToApply, future1, chainOnConn, operationCount, failureCount, true);
    }

    private <T, R> CompletableFuture<T> getFutureAndChainOnContext(Function<R, T> functionToApply, CompletableFuture<R> future1, Function<Supplier<T>, CompletableFuture<T>> chainOnConn, Metric operationCount, Metric failureCount, boolean useMetrics) {
        return chainOnConn.apply(() -> {
            try {
                if (useMetrics) {
                    operationCount.increment();
                }
                return functionToApply.apply(future1.get());
            }
            catch (Exception e) {
                if (useMetrics) {
                    failureCount.increment();
                }
                throw new CompletionException(e);
            }
        });
    }

    <T, R> List<CompletableFuture<T>> applyFunctionToFutures(Function<R, T> function, CompletableFuture<R> future1, CompletableFuture<R> future2, ParallelPhoenixContext context, boolean useMetrics) {
        CompletableFuture<T> result1 = this.getFutureAndChainOnContext(function, future1, context::chainOnConn1, context.getParallelPhoenixMetrics().getActiveClusterOperationCount(), context.getParallelPhoenixMetrics().getActiveClusterFailedOperationCount(), useMetrics);
        CompletableFuture<T> result2 = this.getFutureAndChainOnContext(function, future2, context::chainOnConn2, context.getParallelPhoenixMetrics().getStandbyClusterOperationCount(), context.getParallelPhoenixMetrics().getStandbyClusterFailedOperationCount(), useMetrics);
        return ImmutableList.of(result1, result2);
    }

    <T> Object runFutures(List<CompletableFuture<T>> futures, ParallelPhoenixContext context, boolean useMetrics) throws SQLException {
        ArrayList<CompletableFuture<? extends Object>> ranFutures = new ArrayList<CompletableFuture<? extends Object>>();
        int i = 0;
        while (i < futures.size()) {
            CompletableFuture<T> future = futures.get(i);
            int finalI = i++;
            CompletionStage decoratedFuture = future.thenApply(t -> new FutureResult<Object>(t, finalI));
            ranFutures.add((CompletableFuture<? extends Object>)decoratedFuture);
        }
        FutureResult result = (FutureResult)this.getAnyOfNonExceptionally(ranFutures, context);
        if (useMetrics) {
            context.getParallelPhoenixMetrics().get(PhoenixHAGroupMetrics.HAMetricType.HA_PARALLEL_USED_OPERATIONS, result.index).increment();
        }
        return result.t;
    }

    <T, R> Object runFutures(Function<R, T> function, CompletableFuture<R> future1, CompletableFuture<R> future2, ParallelPhoenixContext context, boolean useMetrics) throws SQLException {
        List<CompletableFuture<T>> list = this.applyFunctionToFutures(function, future1, future2, context, useMetrics);
        return this.runFutures(list, context, useMetrics);
    }

    <T, R> PairOfSameType<Object> runOnFuturesGetAll(Function<R, T> function, CompletableFuture<R> future1, CompletableFuture<R> future2, ParallelPhoenixContext context, boolean useMetrics) throws SQLException {
        T value2;
        T value1;
        CompletableFuture<T> result2;
        CompletableFuture<T> result1;
        if (useMetrics) {
            result1 = this.getFutureAndChainOnContext(function, future1, context::chainOnConn1, context.getParallelPhoenixMetrics().getActiveClusterOperationCount(), context.getParallelPhoenixMetrics().getActiveClusterFailedOperationCount());
            result2 = this.getFutureAndChainOnContext(function, future2, context::chainOnConn2, context.getParallelPhoenixMetrics().getStandbyClusterOperationCount(), context.getParallelPhoenixMetrics().getStandbyClusterFailedOperationCount());
        } else {
            result1 = this.getFutureAndChainOnContextNoMetrics(function, future1, context::chainOnConn1);
            result2 = this.getFutureAndChainOnContextNoMetrics(function, future2, context::chainOnConn2);
        }
        try {
            value1 = result1.get();
        }
        catch (Exception e) {
            throw new SQLException(e);
        }
        try {
            value2 = result2.get();
        }
        catch (Exception e) {
            throw new SQLException(e);
        }
        return new PairOfSameType(value1, value2);
    }

    static class FutureResult<T> {
        private final T t;
        private final int index;

        FutureResult(T t, int index) {
            this.t = t;
            this.index = index;
        }

        T getResult() {
            return this.t;
        }

        int getIndex() {
            return this.index;
        }
    }
}

