/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm;

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.ErrorInjector;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.shaded.com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class XceiverClientRatis
extends XceiverClientSpi {
    public static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
    private final Pipeline pipeline;
    private final RpcType rpcType;
    private final AtomicReference<RaftClient> client = new AtomicReference();
    private final RetryPolicy retryPolicy;
    private final GrpcTlsConfig tlsConfig;
    private final ConfigurationSource ozoneConfiguration;
    private final ConcurrentHashMap<UUID, Long> commitInfoMap;
    private final XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics();
    private final RaftProtos.ReplicationLevel watchType;
    private final int majority;
    private final ErrorInjector errorInjector;

    public static XceiverClientRatis newXceiverClientRatis(Pipeline pipeline, ConfigurationSource ozoneConf) {
        return XceiverClientRatis.newXceiverClientRatis(pipeline, ozoneConf, null, null);
    }

    public static XceiverClientRatis newXceiverClientRatis(Pipeline pipeline, ConfigurationSource ozoneConf, ClientTrustManager trustManager, ErrorInjector errorInjector) {
        String rpcType = ozoneConf.get("hdds.container.ratis.rpc.type", "GRPC");
        RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
        GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new SecurityConfig(ozoneConf), trustManager);
        return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, tlsConfig, ozoneConf, errorInjector);
    }

    private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, ConfigurationSource configuration, ErrorInjector errorInjector) {
        this.pipeline = pipeline;
        this.majority = pipeline.getReplicationConfig().getRequiredNodes() / 2 + 1;
        this.rpcType = rpcType;
        this.retryPolicy = retryPolicy;
        this.commitInfoMap = new ConcurrentHashMap();
        this.tlsConfig = tlsConfig;
        this.ozoneConfiguration = configuration;
        try {
            this.watchType = RaftProtos.ReplicationLevel.valueOf(configuration.getObject(RatisClientConfig.class).getWatchType());
        }
        catch (Exception e) {
            throw new IllegalArgumentException(configuration.getObject(RatisClientConfig.class).getWatchType() + " is not supported. Currently only ALL_COMMITTED or MAJORITY_COMMITTED are supported");
        }
        if (this.watchType != RaftProtos.ReplicationLevel.ALL_COMMITTED && this.watchType != RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) {
            throw new IllegalArgumentException(this.watchType + " is not supported. Currently only ALL_COMMITTED or MAJORITY_COMMITTED are supported");
        }
        LOG.info("WatchType {}. Majority {}, ", (Object)this.watchType, (Object)this.majority);
        if (LOG.isTraceEnabled()) {
            LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(), new Throwable("TRACE"));
        }
        this.errorInjector = errorInjector;
    }

    private long updateCommitInfosMap(RaftClientReply reply, RaftProtos.ReplicationLevel level) {
        return Optional.ofNullable(reply).filter(RaftClientReply::isSuccess).map(RaftClientReply::getCommitInfos).map(v -> this.updateCommitInfosMap((Collection<RaftProtos.CommitInfoProto>)v, level)).orElse(0L);
    }

    public long updateCommitInfosMap(Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
        return this.updateCommitInfosMap(commitInfoProtos, this.watchType);
    }

    public long updateCommitInfosMap(Collection<RaftProtos.CommitInfoProto> commitInfoProtos, RaftProtos.ReplicationLevel level) {
        Stream<Long> stream = this.commitInfoMap.isEmpty() ? commitInfoProtos.stream().map(this::putCommitInfo) : commitInfoProtos.stream().map(proto -> this.commitInfoMap.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()), (address, index) -> proto.getCommitIndex())).filter(Objects::nonNull);
        if (level == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
            return stream.mapToLong(Long::longValue).min().orElse(0L);
        }
        return stream.sorted(Comparator.reverseOrder()).limit(this.majority).skip(this.majority - 1).findFirst().orElse(0L);
    }

    private long putCommitInfo(RaftProtos.CommitInfoProto proto) {
        long index = proto.getCommitIndex();
        this.commitInfoMap.put(RatisHelper.toDatanodeId(proto.getServer()), index);
        return index;
    }

    @Override
    public HddsProtos.ReplicationType getPipelineType() {
        return HddsProtos.ReplicationType.RATIS;
    }

    @Override
    public Pipeline getPipeline() {
        return this.pipeline;
    }

    @Override
    public void connect() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to pipeline:{} leaderDatanode:{}, primaryDatanode:{}", new Object[]{this.getPipeline().getId(), RatisHelper.toRaftPeerId(this.pipeline.getLeaderNode()), RatisHelper.toRaftPeerId(this.pipeline.getClosestNode())});
        }
        if (!this.client.compareAndSet(null, RatisHelper.newRaftClient(this.rpcType, this.getPipeline(), this.retryPolicy, this.tlsConfig, this.ozoneConfiguration))) {
            throw new IllegalStateException("Client is already connected.");
        }
    }

    @Override
    public void close() {
        RaftClient c = this.client.getAndSet(null);
        if (c != null) {
            this.closeRaftClient(c);
        }
    }

    private void closeRaftClient(RaftClient raftClient) {
        try {
            raftClient.close();
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    private RaftClient getClient() {
        return Objects.requireNonNull(this.client.get(), "client is null");
    }

    @VisibleForTesting
    public ConcurrentMap<UUID, Long> getCommitInfoMap() {
        return this.commitInfoMap;
    }

    private CompletableFuture<RaftClientReply> sendRequestAsync(ContainerProtos.ContainerCommandRequestProto request) {
        RaftClientReply response;
        if (this.errorInjector != null && (response = this.errorInjector.getResponse(request, this.getClient().getId(), this.pipeline)) != null) {
            return CompletableFuture.completedFuture(response);
        }
        return TracingUtil.executeInNewSpan("XceiverClientRatis." + request.getCmdType().name(), () -> {
            ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(request, TracingUtil.exportCurrentSpan());
            if (HddsUtils.isReadOnly(request)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("sendCommandAsync ReadOnly {}", (Object)message);
                }
                return this.getClient().async().sendReadOnly(message);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("sendCommandAsync {}", (Object)message);
            }
            return this.getClient().async().send(message);
        });
    }

    @Override
    public long getReplicatedMinCommitIndex() {
        return this.commitInfoMap.values().parallelStream().mapToLong(Long::longValue).min().orElse(0L);
    }

    private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
        DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
        builder.setUuid(address);
        reply.addDatanode(builder.build());
    }

    private XceiverClientReply newWatchReply(long watchIndex, Object reason, long replyIndex) {
        LOG.debug("watchForCommit({}) returns {} {}", new Object[]{watchIndex, reason, replyIndex});
        XceiverClientReply reply = new XceiverClientReply(null);
        reply.setLogIndex(replyIndex);
        return reply;
    }

    @Override
    public XceiverClientReply watchForCommit(long index) throws InterruptedException, ExecutionException, TimeoutException, IOException {
        long replicatedMin = this.getReplicatedMinCommitIndex();
        if (replicatedMin >= index) {
            return this.newWatchReply(index, "replicatedMin", replicatedMin);
        }
        try {
            CompletableFuture<RaftClientReply> replyFuture = this.getClient().async().watch(index, this.watchType);
            RaftClientReply reply = replyFuture.get();
            long updated = this.updateCommitInfosMap(reply, this.watchType);
            Preconditions.checkState(updated >= index, "Returned index " + updated + " is smaller than expected " + index);
            return this.newWatchReply(index, this.watchType, updated);
        }
        catch (Exception e) {
            LOG.warn("{} way commit failed on pipeline {}", new Object[]{this.watchType, this.pipeline, e});
            Throwable t2 = HddsClientUtils.containsException(e, GroupMismatchException.class);
            if (t2 != null) {
                throw e;
            }
            if (this.watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
                Collection<RaftProtos.CommitInfoProto> commitInfoProtoList;
                Throwable nre = HddsClientUtils.containsException(e, NotReplicatedException.class);
                if (nre instanceof NotReplicatedException) {
                    commitInfoProtoList = ((NotReplicatedException)nre).getCommitInfos();
                } else {
                    RaftClientReply reply = this.getClient().async().watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED).get();
                    commitInfoProtoList = reply.getCommitInfos();
                }
                return this.handleFailedAllCommit(index, commitInfoProtoList);
            }
            throw e;
        }
    }

    private XceiverClientReply handleFailedAllCommit(long index, Collection<RaftProtos.CommitInfoProto> commitInfoProtoList) {
        XceiverClientReply clientReply = this.newWatchReply(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED, index);
        commitInfoProtoList.stream().filter(i -> i.getCommitIndex() < index).forEach(proto -> {
            UUID address = RatisHelper.toDatanodeId(proto.getServer());
            this.addDatanodetoReply(address, clientReply);
            this.commitInfoMap.remove(address);
            LOG.info("Could not commit index {} on pipeline {} to all the nodes. Server {} has failed. Committed by majority.", new Object[]{index, this.pipeline, address});
        });
        return clientReply;
    }

    @Override
    public XceiverClientReply sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) {
        XceiverClientReply asyncReply = new XceiverClientReply(null);
        long requestTime = System.currentTimeMillis();
        CompletableFuture<RaftClientReply> raftClientReply = this.sendRequestAsync(request);
        this.metrics.incrPendingContainerOpsMetrics(request.getCmdType());
        CompletionStage containerCommandResponse = ((CompletableFuture)raftClientReply.whenComplete((reply, e) -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("received reply {} for request: cmdType={} containerID={} pipelineID={} traceID={} exception: {}", new Object[]{reply, request.getCmdType(), request.getContainerID(), request.getPipelineID(), request.getTraceID(), e});
            }
            this.metrics.decrPendingContainerOpsMetrics(request.getCmdType());
            this.metrics.addContainerOpsLatency(request.getCmdType(), System.currentTimeMillis() - requestTime);
        })).thenApply(reply -> {
            try {
                if (!reply.isSuccess()) {
                    RaftException exception = reply.getException();
                    Preconditions.checkNotNull(exception, "Raft reply failure but no exception propagated.");
                    throw new CompletionException(exception);
                }
                ContainerProtos.ContainerCommandResponseProto response = ContainerProtos.ContainerCommandResponseProto.parseFrom(reply.getMessage().getContent());
                UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
                if (response.getResult() == ContainerProtos.Result.SUCCESS) {
                    this.updateCommitInfosMap(reply.getCommitInfos(), this.watchType);
                }
                asyncReply.setLogIndex(reply.getLogIndex());
                this.addDatanodetoReply(serverId, asyncReply);
                return response;
            }
            catch (InvalidProtocolBufferException e) {
                throw new CompletionException(e);
            }
        });
        asyncReply.setResponse((CompletableFuture<ContainerProtos.ContainerCommandResponseProto>)containerCommandResponse);
        return asyncReply;
    }

    @Override
    public Map<DatanodeDetails, ContainerProtos.ContainerCommandResponseProto> sendCommandOnAllNodes(ContainerProtos.ContainerCommandRequestProto request) {
        throw new UnsupportedOperationException("Operation Not supported for ratis client");
    }

    public DataStreamApi getDataStreamApi() {
        return this.getClient().getDataStreamApi();
    }
}

