/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.shaded.io.opentracing.Scope;
import org.apache.hadoop.ozone.shaded.io.opentracing.Span;
import org.apache.hadoop.ozone.shaded.io.opentracing.util.GlobalTracer;
import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XceiverClientGrpc
extends XceiverClientSpi {
    private static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
    private final Pipeline pipeline;
    private final ConfigurationSource config;
    private final Map<UUID, XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub> asyncStubs;
    private final XceiverClientMetrics metrics;
    private final Map<UUID, ManagedChannel> channels;
    private final Semaphore semaphore;
    private long timeout;
    private final SecurityConfig secConfig;
    private final boolean topologyAwareRead;
    private final ClientTrustManager trustManager;
    private final Map<ContainerProtos.DatanodeBlockID, DatanodeDetails> getBlockDNcache;
    private boolean closed = false;

    public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, ClientTrustManager trustManager) {
        Preconditions.checkNotNull(pipeline);
        Preconditions.checkNotNull(config);
        this.setTimeout(config.getTimeDuration("ozone.client.read.timeout", "30s", TimeUnit.SECONDS));
        this.pipeline = pipeline;
        this.config = config;
        this.secConfig = new SecurityConfig(config);
        this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
        this.metrics = XceiverClientManager.getXceiverClientMetrics();
        this.channels = new HashMap<UUID, ManagedChannel>();
        this.asyncStubs = new HashMap<UUID, XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub>();
        this.topologyAwareRead = config.getBoolean("ozone.network.topology.aware.read", true);
        this.trustManager = trustManager;
        this.getBlockDNcache = new ConcurrentHashMap<ContainerProtos.DatanodeBlockID, DatanodeDetails>();
    }

    public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config) {
        this(pipeline, config, null);
    }

    @Override
    public void connect() throws Exception {
        DatanodeDetails dn = this.topologyAwareRead ? this.pipeline.getClosestNode() : this.pipeline.getFirstNode();
        this.connectToDatanode(dn);
    }

    private synchronized void connectToDatanode(DatanodeDetails dn) throws IOException {
        if (this.isConnected(dn)) {
            return;
        }
        int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
        if (port == 0) {
            port = this.config.getInt("hdds.container.ipc.port", 9859);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Nodes in pipeline : {}", this.pipeline.getNodes());
            LOG.debug("Connecting to server : {}", (Object)dn.getIpAddress());
        }
        ManagedChannel channel = this.createChannel(dn, port).build();
        XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
        this.asyncStubs.put(dn.getUuid(), asyncStub);
        this.channels.put(dn.getUuid(), channel);
    }

    protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) throws IOException {
        NettyChannelBuilder channelBuilder = (NettyChannelBuilder)((NettyChannelBuilder)NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext().maxInboundMessageSize(0x2000000).proxyDetector(uri -> null)).intercept(new ClientInterceptor[]{new GrpcClientInterceptor()});
        if (this.secConfig.isSecurityEnabled() && this.secConfig.isGrpcTlsEnabled()) {
            SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
            if (this.trustManager != null) {
                sslContextBuilder.trustManager(this.trustManager);
            }
            if (this.secConfig.useTestCert()) {
                channelBuilder.overrideAuthority("localhost");
            }
            channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
        } else {
            channelBuilder.usePlaintext();
        }
        return channelBuilder;
    }

    @VisibleForTesting
    public boolean isConnected(DatanodeDetails details) {
        return this.isConnected(this.channels.get(details.getUuid()));
    }

    private boolean isConnected(ManagedChannel channel) {
        return channel != null && !channel.isTerminated() && !channel.isShutdown();
    }

    @Override
    public synchronized void close() {
        this.closed = true;
        for (ManagedChannel channel : this.channels.values()) {
            channel.shutdownNow();
            try {
                channel.awaitTermination(60L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                LOG.error("InterruptedException while waiting for channel termination", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public Pipeline getPipeline() {
        return this.pipeline;
    }

    @Override
    public ContainerProtos.ContainerCommandResponseProto sendCommand(ContainerProtos.ContainerCommandRequestProto request) throws IOException {
        try {
            return this.sendCommandWithTraceIDAndRetry(request, null).getResponse().get();
        }
        catch (ExecutionException e) {
            throw XceiverClientGrpc.getIOExceptionForSendCommand(request, e);
        }
        catch (InterruptedException e) {
            LOG.error("Command execution was interrupted.");
            Thread.currentThread().interrupt();
            throw (IOException)new InterruptedIOException("Command " + HddsUtils.processForDebug(request) + " was interrupted.").initCause(e);
        }
    }

    @Override
    public Map<DatanodeDetails, ContainerProtos.ContainerCommandResponseProto> sendCommandOnAllNodes(ContainerProtos.ContainerCommandRequestProto request) throws IOException {
        HashMap<DatanodeDetails, ContainerProtos.ContainerCommandResponseProto> responseProtoHashMap = new HashMap<DatanodeDetails, ContainerProtos.ContainerCommandResponseProto>();
        List<DatanodeDetails> datanodeList = this.pipeline.getNodes();
        HashMap<DatanodeDetails, CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> futureHashMap = new HashMap<DatanodeDetails, CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>();
        if (!request.hasVersion()) {
            ContainerProtos.ContainerCommandRequestProto.Builder builder = ContainerProtos.ContainerCommandRequestProto.newBuilder(request);
            builder.setVersion(ClientVersion.CURRENT.toProtoValue());
            request = builder.build();
        }
        for (DatanodeDetails datanodeDetails : datanodeList) {
            try {
                request = this.reconstructRequestIfNeeded(request, datanodeDetails);
                futureHashMap.put(datanodeDetails, this.sendCommandAsync(request, datanodeDetails).getResponse());
            }
            catch (InterruptedException e) {
                LOG.error("Command execution was interrupted.");
                Thread.currentThread().interrupt();
            }
        }
        for (Map.Entry entry : futureHashMap.entrySet()) {
            try {
                responseProtoHashMap.put((DatanodeDetails)entry.getKey(), (ContainerProtos.ContainerCommandResponseProto)((CompletableFuture)entry.getValue()).get());
            }
            catch (InterruptedException e) {
                LOG.error("Command execution was interrupted.");
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                String message = "Failed to execute command {} on datanode " + ((DatanodeDetails)entry.getKey()).getHostName();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(message, (Object)HddsUtils.processForDebug(request), (Object)e);
                    continue;
                }
                LOG.error(message + " Exception Class: {}, Exception Message: {}", new Object[]{request.getCmdType(), e.getClass().getName(), e.getMessage()});
            }
        }
        return responseProtoHashMap;
    }

    private ContainerProtos.ContainerCommandRequestProto reconstructRequestIfNeeded(ContainerProtos.ContainerCommandRequestProto request, DatanodeDetails dn) {
        boolean isEcRequest;
        boolean bl = isEcRequest = this.pipeline.getReplicationConfig().getReplicationType() == HddsProtos.ReplicationType.EC;
        if (request.hasGetBlock() && isEcRequest) {
            ContainerProtos.GetBlockRequestProto gbr = request.getGetBlock();
            request = request.toBuilder().setGetBlock(gbr.toBuilder().setBlockID(gbr.getBlockID().toBuilder().setReplicaIndex(this.pipeline.getReplicaIndex(dn)).build()).build()).build();
        }
        return request;
    }

    @Override
    public ContainerProtos.ContainerCommandResponseProto sendCommand(ContainerProtos.ContainerCommandRequestProto request, List<XceiverClientSpi.Validator> validators) throws IOException {
        try {
            XceiverClientReply reply = this.sendCommandWithTraceIDAndRetry(request, validators);
            return reply.getResponse().get();
        }
        catch (ExecutionException e) {
            throw XceiverClientGrpc.getIOExceptionForSendCommand(request, e);
        }
        catch (InterruptedException e) {
            LOG.error("Command execution was interrupted.");
            Thread.currentThread().interrupt();
            throw (IOException)new InterruptedIOException("Command " + HddsUtils.processForDebug(request) + " was interrupted.").initCause(e);
        }
    }

    private XceiverClientReply sendCommandWithTraceIDAndRetry(ContainerProtos.ContainerCommandRequestProto request, List<XceiverClientSpi.Validator> validators) throws IOException {
        String spanName = "XceiverClientGrpc." + request.getCmdType().name();
        return TracingUtil.executeInNewSpan(spanName, () -> {
            ContainerProtos.ContainerCommandRequestProto.Builder builder = ContainerProtos.ContainerCommandRequestProto.newBuilder(request).setTraceID(TracingUtil.exportCurrentSpan());
            if (!request.hasVersion()) {
                builder.setVersion(ClientVersion.CURRENT.toProtoValue());
            }
            return this.sendCommandWithRetry(builder.build(), validators);
        });
    }

    private XceiverClientReply sendCommandWithRetry(ContainerProtos.ContainerCommandRequestProto request, List<XceiverClientSpi.Validator> validators) throws IOException {
        boolean allInService;
        ContainerProtos.ContainerCommandResponseProto responseProto = null;
        IOException ioException = null;
        XceiverClientReply reply = new XceiverClientReply(null);
        List<DatanodeDetails> datanodeList = null;
        ContainerProtos.DatanodeBlockID blockID = null;
        if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
            blockID = request.getGetBlock().getBlockID();
        } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
            blockID = request.getReadChunk().getBlockID();
        } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
            blockID = request.getGetSmallFile().getBlock().getBlockID();
        }
        if (blockID != null) {
            int getBlockDNCacheIndex;
            DatanodeDetails cachedDN;
            int getBlockDNLeaderIndex;
            if (request.getCmdType() != ContainerProtos.Type.ReadChunk && (getBlockDNLeaderIndex = (datanodeList = this.pipeline.getNodes()).indexOf(this.pipeline.getLeaderNode())) > 0) {
                Collections.swap(datanodeList, 0, getBlockDNLeaderIndex);
            }
            if ((cachedDN = this.getBlockDNcache.get(blockID)) != null && !this.topologyAwareRead && (getBlockDNCacheIndex = (datanodeList = this.pipeline.getNodes()).indexOf(cachedDN)) > 0) {
                Collections.swap(datanodeList, 0, getBlockDNCacheIndex);
            }
        }
        if (datanodeList == null) {
            if (this.topologyAwareRead) {
                datanodeList = this.pipeline.getNodesInOrder();
            } else {
                datanodeList = this.pipeline.getNodes();
                Collections.shuffle(datanodeList);
            }
        }
        if (!(allInService = datanodeList.stream().allMatch(dn -> dn.getPersistedOpState() == HddsProtos.NodeOperationalState.IN_SERVICE))) {
            datanodeList = XceiverClientGrpc.sortDatanodeByOperationalState(datanodeList);
        }
        for (DatanodeDetails dn2 : datanodeList) {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Executing command {} on datanode {}", (Object)HddsUtils.processForDebug(request), (Object)dn2);
                }
                reply.addDatanode(dn2);
                responseProto = this.sendCommandAsync(request, dn2).getResponse().get();
                if (validators != null && !validators.isEmpty()) {
                    for (XceiverClientSpi.Validator validator : validators) {
                        validator.accept(request, responseProto);
                    }
                }
                if (request.getCmdType() != ContainerProtos.Type.GetBlock) break;
                ContainerProtos.DatanodeBlockID getBlockID = request.getGetBlock().getBlockID();
                this.getBlockDNcache.put(getBlockID, dn2);
                break;
            }
            catch (IOException e) {
                ioException = e;
                responseProto = null;
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Failed to execute command {} on datanode {}", new Object[]{HddsUtils.processForDebug(request), dn2, e});
            }
            catch (ExecutionException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to execute command {} on datanode {}", new Object[]{HddsUtils.processForDebug(request), dn2, e});
                }
                if (Status.fromThrowable(e.getCause()).getCode() == Status.UNAUTHENTICATED.getCode()) {
                    throw new SCMSecurityException("Failed to authenticate with GRPC XceiverServer with Ozone block token.");
                }
                ioException = new IOException(e);
            }
            catch (InterruptedException e) {
                LOG.error("Command execution was interrupted ", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        if (responseProto != null) {
            reply.setResponse(CompletableFuture.completedFuture(responseProto));
            return reply;
        }
        Objects.requireNonNull(ioException);
        String message = "Failed to execute command {}";
        if (LOG.isDebugEnabled()) {
            LOG.debug(message + " on the pipeline {}.", (Object)HddsUtils.processForDebug(request), (Object)this.pipeline);
        } else {
            LOG.warn(message + " on the pipeline {}.", (Object)request.getCmdType(), (Object)this.pipeline);
        }
        throw ioException;
    }

    private static List<DatanodeDetails> sortDatanodeByOperationalState(List<DatanodeDetails> datanodeList) {
        ArrayList<DatanodeDetails> sortedDatanodeList = new ArrayList<DatanodeDetails>(datanodeList);
        Comparator byOpStateStable = (first, second) -> {
            boolean secondInService;
            boolean firstInService = first.getPersistedOpState() == HddsProtos.NodeOperationalState.IN_SERVICE;
            boolean bl = secondInService = second.getPersistedOpState() == HddsProtos.NodeOperationalState.IN_SERVICE;
            if (firstInService == secondInService) {
                return 0;
            }
            if (firstInService) {
                return -1;
            }
            return 1;
        };
        sortedDatanodeList.sort(byOpStateStable);
        return sortedDatanodeList;
    }

    /*
     * Loose catch block
     */
    @Override
    public XceiverClientReply sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException {
        Span span = GlobalTracer.get().buildSpan("XceiverClientGrpc." + request.getCmdType().name()).start();
        try {
            try (Scope ignored = GlobalTracer.get().activateSpan(span);){
                ContainerProtos.ContainerCommandRequestProto.Builder builder = ContainerProtos.ContainerCommandRequestProto.newBuilder(request).setTraceID(TracingUtil.exportCurrentSpan());
                if (!request.hasVersion()) {
                    builder.setVersion(ClientVersion.CURRENT.toProtoValue());
                }
                XceiverClientReply asyncReply = this.sendCommandAsync(builder.build(), this.pipeline.getFirstNode());
                if (this.shouldBlockAndWaitAsyncReply(request)) {
                    asyncReply.getResponse().get();
                }
                XceiverClientReply xceiverClientReply = asyncReply;
                return xceiverClientReply;
            }
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            span.finish();
        }
    }

    protected boolean shouldBlockAndWaitAsyncReply(ContainerProtos.ContainerCommandRequestProto request) {
        return !HddsUtils.isReadOnly(request);
    }

    @VisibleForTesting
    public XceiverClientReply sendCommandAsync(final ContainerProtos.ContainerCommandRequestProto request, final DatanodeDetails dn) throws IOException, InterruptedException {
        this.checkOpen(dn);
        UUID dnId = dn.getUuid();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Send command {} to datanode {}", (Object)request.getCmdType(), (Object)dn.getIpAddress());
        }
        final CompletableFuture<ContainerProtos.ContainerCommandResponseProto> replyFuture = new CompletableFuture<ContainerProtos.ContainerCommandResponseProto>();
        this.semaphore.acquire();
        final long requestTime = System.currentTimeMillis();
        this.metrics.incrPendingContainerOpsMetrics(request.getCmdType());
        StreamObserver<ContainerProtos.ContainerCommandRequestProto> requestObserver = ((XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub)this.asyncStubs.get(dnId).withDeadlineAfter(this.timeout, TimeUnit.SECONDS)).send(new StreamObserver<ContainerProtos.ContainerCommandResponseProto>(){

            @Override
            public void onNext(ContainerProtos.ContainerCommandResponseProto value) {
                replyFuture.complete(value);
                this.decreasePendingMetricsAndReleaseSemaphore();
            }

            @Override
            public void onError(Throwable t2) {
                replyFuture.completeExceptionally(t2);
                this.decreasePendingMetricsAndReleaseSemaphore();
            }

            @Override
            public void onCompleted() {
                if (!replyFuture.isDone()) {
                    replyFuture.completeExceptionally(new IOException("Stream completed but no reply for request " + HddsUtils.processForDebug(request)));
                }
            }

            private void decreasePendingMetricsAndReleaseSemaphore() {
                XceiverClientGrpc.this.metrics.decrPendingContainerOpsMetrics(request.getCmdType());
                long cost = System.currentTimeMillis() - requestTime;
                XceiverClientGrpc.this.metrics.addContainerOpsLatency(request.getCmdType(), cost);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Executed command {} on datanode {}, cost = {}, cmdType = {}", new Object[]{HddsUtils.processForDebug(request), dn, cost, request.getCmdType()});
                }
                XceiverClientGrpc.this.semaphore.release();
            }
        });
        requestObserver.onNext(request);
        requestObserver.onCompleted();
        return new XceiverClientReply(replyFuture);
    }

    private synchronized void checkOpen(DatanodeDetails dn) throws IOException {
        if (this.closed) {
            throw new IOException("This channel is not connected.");
        }
        ManagedChannel channel = this.channels.get(dn.getUuid());
        if (!this.isConnected(channel)) {
            this.reconnect(dn);
        }
    }

    private void reconnect(DatanodeDetails dn) throws IOException {
        ManagedChannel channel;
        try {
            this.connectToDatanode(dn);
            channel = this.channels.get(dn.getUuid());
        }
        catch (Exception e) {
            throw new IOException("Error while connecting", e);
        }
        if (!this.isConnected(channel)) {
            throw new IOException("This channel is not connected.");
        }
    }

    @Override
    public XceiverClientReply watchForCommit(long index) throws InterruptedException, ExecutionException, TimeoutException, IOException {
        return null;
    }

    @Override
    public long getReplicatedMinCommitIndex() {
        return 0L;
    }

    @Override
    public HddsProtos.ReplicationType getPipelineType() {
        return HddsProtos.ReplicationType.STAND_ALONE;
    }

    public ConfigurationSource getConfig() {
        return this.config;
    }

    @VisibleForTesting
    public static Logger getLogger() {
        return LOG;
    }

    public void setTimeout(long timeout2) {
        this.timeout = timeout2;
    }
}

