/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.grpc.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.grpc.server.GrpcAdminProtocolService;
import org.apache.ratis.grpc.server.GrpcClientProtocolService;
import org.apache.ratis.grpc.server.GrpcServerProtocolClient;
import org.apache.ratis.grpc.server.GrpcServerProtocolService;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.thirdparty.io.grpc.BindableService;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class GrpcService
extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>> {
    static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
    public static final String GRPC_SEND_SERVER_REQUEST = JavaUtils.getClassSimpleName(GrpcService.class) + ".sendRequest";
    private final Map<String, Server> servers = new HashMap<String, Server>();
    private final Supplier<InetSocketAddress> addressSupplier;
    private final Supplier<InetSocketAddress> clientServerAddressSupplier;
    private final Supplier<InetSocketAddress> adminServerAddressSupplier;
    private final AsyncService asyncService = new AsyncService();
    private final ExecutorService executor;
    private final GrpcClientProtocolService clientProtocolService;
    private final MetricServerInterceptor serverInterceptor;

    public static Builder newBuilder() {
        return new Builder();
    }

    public MetricServerInterceptor getServerInterceptor() {
        return this.serverInterceptor;
    }

    private GrpcService(RaftServer server, GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
        this(server, server::getId, GrpcConfigKeys.Admin.host(server.getProperties()), GrpcConfigKeys.Admin.port(server.getProperties()), adminTlsConfig, GrpcConfigKeys.Client.host(server.getProperties()), GrpcConfigKeys.Client.port(server.getProperties()), clientTlsConfig, GrpcConfigKeys.Server.host(server.getProperties()), GrpcConfigKeys.Server.port(server.getProperties()), serverTlsConfig, GrpcConfigKeys.messageSizeMax(server.getProperties(), arg_0 -> ((Logger)LOG).info(arg_0)), RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()), GrpcConfigKeys.flowControlWindow(server.getProperties(), arg_0 -> ((Logger)LOG).info(arg_0)), RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()), GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
    }

    private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, String adminHost, int adminPort, GrpcTlsConfig adminTlsConfig, String clientHost, int clientPort, GrpcTlsConfig clientTlsConfig, String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig, SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration, boolean useSeparateHBChannel) {
        super(idSupplier, id -> new PeerProxyMap<GrpcServerProtocolClient>(id.toString(), p -> new GrpcServerProtocolClient((RaftPeer)p, flowControlWindow.getSizeInt(), requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
        NettyServerBuilder builder;
        SizeInBytes gap = SizeInBytes.ONE_MB;
        long diff = grpcMessageSizeMax.getSize() - appenderBufferSize.getSize();
        if (diff < gap.getSize()) {
            throw new IllegalArgumentException("Illegal configuration: raft.grpc.message.size.max(= " + grpcMessageSizeMax + ") must be " + gap + " larger than " + "raft.server.log.appender.buffer.byte-limit" + "(= " + appenderBufferSize + ").");
        }
        RaftProperties properties = raftServer.getProperties();
        this.executor = ConcurrentUtils.newThreadPoolWithMax(GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties), GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties), this.getId() + "-request-");
        this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, this.executor);
        this.serverInterceptor = new MetricServerInterceptor(idSupplier, JavaUtils.getClassSimpleName(this.getClass()) + "_" + serverPort);
        boolean separateAdminServer = adminPort != serverPort && adminPort > 0;
        boolean separateClientServer = clientPort != serverPort && clientPort > 0;
        NettyServerBuilder serverBuilder = GrpcService.startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow);
        serverBuilder.addService(ServerInterceptors.intercept((BindableService)new GrpcServerProtocolService(idSupplier, raftServer), this.serverInterceptor));
        if (!separateAdminServer) {
            this.addAdminService(raftServer, serverBuilder);
        }
        if (!separateClientServer) {
            this.addClientService(serverBuilder);
        }
        Server server = serverBuilder.build();
        this.servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
        this.addressSupplier = this.newAddressSupplier(serverPort, server);
        if (separateAdminServer) {
            builder = GrpcService.startBuildingNettyServer(adminHost, adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow);
            this.addAdminService(raftServer, builder);
            Server adminServer = builder.build();
            this.servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
            this.adminServerAddressSupplier = this.newAddressSupplier(adminPort, adminServer);
        } else {
            this.adminServerAddressSupplier = this.addressSupplier;
        }
        if (separateClientServer) {
            builder = GrpcService.startBuildingNettyServer(clientHost, clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow);
            this.addClientService(builder);
            Server clientServer = builder.build();
            this.servers.put(GrpcClientProtocolService.class.getName(), clientServer);
            this.clientServerAddressSupplier = this.newAddressSupplier(clientPort, clientServer);
        } else {
            this.clientServerAddressSupplier = this.addressSupplier;
        }
    }

    private MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, Server server) {
        return JavaUtils.memoize(() -> new InetSocketAddress(port != 0 ? port : server.getPort()));
    }

    private void addClientService(NettyServerBuilder builder) {
        builder.addService(ServerInterceptors.intercept((BindableService)this.clientProtocolService, this.serverInterceptor));
    }

    private void addAdminService(RaftServer raftServer, NettyServerBuilder nettyServerBuilder) {
        nettyServerBuilder.addService(ServerInterceptors.intercept((BindableService)new GrpcAdminProtocolService(raftServer), this.serverInterceptor));
    }

    private static NettyServerBuilder startBuildingNettyServer(String hostname, int port, GrpcTlsConfig tlsConfig, SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) {
        InetSocketAddress address = hostname == null || hostname.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
        NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address).withChildOption(ChannelOption.SO_REUSEADDR, true).maxInboundMessageSize(grpcMessageSizeMax.getSizeInt()).flowControlWindow(flowControlWindow.getSizeInt());
        if (tlsConfig != null) {
            SslContextBuilder sslContextBuilder = GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
            if (tlsConfig.getMtlsEnabled()) {
                sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
                GrpcUtil.setTrustManager(sslContextBuilder, tlsConfig.getTrustManager());
            }
            sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL);
            try {
                nettyServerBuilder.sslContext(sslContextBuilder.build());
            }
            catch (Exception ex) {
                throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig, ex);
            }
        }
        return nettyServerBuilder;
    }

    @Override
    public SupportedRpcType getRpcType() {
        return SupportedRpcType.GRPC;
    }

    @Override
    public void startImpl() {
        for (Server server : this.servers.values()) {
            try {
                server.start();
            }
            catch (IOException e) {
                ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
            }
            LOG.info("{}: {} started, listening on {}", new Object[]{this.getId(), JavaUtils.getClassSimpleName(this.getClass()), server.getPort()});
        }
    }

    @Override
    public void closeImpl() throws IOException {
        for (Map.Entry<String, Server> server : this.servers.entrySet()) {
            String name = this.getId() + ": shutdown server " + server.getKey();
            LOG.info("{} now", (Object)name);
            Server s2 = server.getValue().shutdownNow();
            super.closeImpl();
            try {
                s2.awaitTermination();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw IOUtils.toInterruptedIOException(name + " failed", e);
            }
            LOG.info("{} successfully", (Object)name);
        }
        this.serverInterceptor.close();
        ConcurrentUtils.shutdownAndWait(this.executor);
    }

    @Override
    public void notifyNotLeader(RaftGroupId groupId) {
        this.clientProtocolService.closeAllOrderedRequestStreamObservers(groupId);
    }

    @Override
    public InetSocketAddress getInetSocketAddress() {
        return this.addressSupplier.get();
    }

    @Override
    public InetSocketAddress getClientServerAddress() {
        return this.clientServerAddressSupplier.get();
    }

    @Override
    public InetSocketAddress getAdminServerAddress() {
        return this.adminServerAddressSupplier.get();
    }

    @Override
    public RaftServerAsynchronousProtocol async() {
        return this.asyncService;
    }

    @Override
    public RaftProtos.AppendEntriesReplyProto appendEntries(RaftProtos.AppendEntriesRequestProto request) {
        throw new UnsupportedOperationException("Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
    }

    @Override
    public RaftProtos.InstallSnapshotReplyProto installSnapshot(RaftProtos.InstallSnapshotRequestProto request) {
        throw new UnsupportedOperationException("Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
    }

    @Override
    public RaftProtos.RequestVoteReplyProto requestVote(RaftProtos.RequestVoteRequestProto request) throws IOException {
        CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, this.getId(), null, request);
        RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
        return ((GrpcServerProtocolClient)((PeerProxyMap)this.getProxies()).getProxy(target)).requestVote(request);
    }

    @Override
    public RaftProtos.StartLeaderElectionReplyProto startLeaderElection(RaftProtos.StartLeaderElectionRequestProto request) throws IOException {
        CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, this.getId(), null, request);
        RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
        return ((GrpcServerProtocolClient)((PeerProxyMap)this.getProxies()).getProxy(target)).startLeaderElection(request);
    }

    public static final class Builder {
        private RaftServer server;
        private GrpcTlsConfig adminTlsConfig;
        private GrpcTlsConfig clientTlsConfig;
        private GrpcTlsConfig serverTlsConfig;

        private Builder() {
        }

        public Builder setServer(RaftServer raftServer) {
            this.server = raftServer;
            return this;
        }

        public GrpcService build() {
            return new GrpcService(this.server, this.adminTlsConfig, this.clientTlsConfig, this.serverTlsConfig);
        }

        public Builder setAdminTlsConfig(GrpcTlsConfig config) {
            this.adminTlsConfig = config;
            return this;
        }

        public Builder setClientTlsConfig(GrpcTlsConfig config) {
            this.clientTlsConfig = config;
            return this;
        }

        public Builder setServerTlsConfig(GrpcTlsConfig config) {
            this.serverTlsConfig = config;
            return this;
        }
    }

    class AsyncService
    implements RaftServerAsynchronousProtocol {
        AsyncService() {
        }

        @Override
        public CompletableFuture<RaftProtos.AppendEntriesReplyProto> appendEntriesAsync(RaftProtos.AppendEntriesRequestProto request) {
            throw new UnsupportedOperationException("This method is not supported");
        }

        @Override
        public CompletableFuture<RaftProtos.ReadIndexReplyProto> readIndexAsync(RaftProtos.ReadIndexRequestProto request) throws IOException {
            CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, GrpcService.this.getId(), null, request);
            final CompletableFuture<RaftProtos.ReadIndexReplyProto> f = new CompletableFuture<RaftProtos.ReadIndexReplyProto>();
            StreamObserver<RaftProtos.ReadIndexReplyProto> s2 = new StreamObserver<RaftProtos.ReadIndexReplyProto>(){

                @Override
                public void onNext(RaftProtos.ReadIndexReplyProto reply) {
                    f.complete(reply);
                }

                @Override
                public void onError(Throwable throwable) {
                    f.completeExceptionally(throwable);
                }

                @Override
                public void onCompleted() {
                }
            };
            RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
            ((GrpcServerProtocolClient)((PeerProxyMap)GrpcService.this.getProxies()).getProxy(target)).readIndex(request, s2);
            return f;
        }
    }
}

