package org.opensearch.performanceanalyzer.net;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.grpc.MetricsRequest;
import org.opensearch.performanceanalyzer.grpc.MetricsResponse;
import org.opensearch.performanceanalyzer.grpc.PublishResponse;
import org.opensearch.performanceanalyzer.grpc.SubscribeMessage;
import org.opensearch.performanceanalyzer.grpc.SubscribeResponse;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;

/* loaded from: input_file:org/opensearch/performanceanalyzer/net/NetClient.class */
public class NetClient {
    private static final Logger LOG = LogManager.getLogger(NetClient.class);
    private final GRPCConnectionManager connectionManager;
    private ConcurrentMap<InstanceDetails.Id, ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>> perHostAndNodeOpenDataStreamMap = new ConcurrentHashMap();

    public NetClient(GRPCConnectionManager gRPCConnectionManager) {
        this.connectionManager = gRPCConnectionManager;
    }

    public GRPCConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    protected ConcurrentMap<InstanceDetails.Id, ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>> getPerHostAndNodeOpenDataStreamMap() {
        return this.perHostAndNodeOpenDataStreamMap;
    }

    public void subscribe(InstanceDetails instanceDetails, SubscribeMessage subscribeMessage, StreamObserver<SubscribeResponse> streamObserver) {
        LOG.debug("Trying to send intent message to {}", instanceDetails);
        try {
            this.connectionManager.getClientStubForHost(instanceDetails).subscribe(subscribeMessage, streamObserver);
            ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_OUT, subscribeMessage.getRequesterGraphNode(), Integer.valueOf(subscribeMessage.getSerializedSize()));
        } catch (StatusRuntimeException e) {
            LOG.error("Encountered an error trying to subscribe. Status: {}", e.getStatus(), e);
            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
        }
    }

    public void publish(InstanceDetails instanceDetails, FlowUnitMessage flowUnitMessage, StreamObserver<PublishResponse> streamObserver) {
        LOG.debug("Publishing {} data to {}", flowUnitMessage.getGraphNode(), instanceDetails);
        try {
            getDataStreamForHost(instanceDetails, flowUnitMessage.getGraphNode(), streamObserver).onNext(flowUnitMessage);
            ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_OUT, flowUnitMessage.getGraphNode(), Integer.valueOf(flowUnitMessage.getSerializedSize()));
        } catch (StatusRuntimeException e) {
            LOG.error("rca: Encountered an error trying to publish a flow unit. Status: {}", e.getStatus(), e);
            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
        }
    }

    public void getMetrics(InstanceDetails instanceDetails, MetricsRequest metricsRequest, StreamObserver<MetricsResponse> streamObserver) {
        this.connectionManager.getClientStubForHost(instanceDetails).getMetrics(metricsRequest, streamObserver);
    }

    public void stop() {
        LOG.debug("Shutting down client streaming connections..");
        closeAllDataStreams();
        this.connectionManager.shutdown();
    }

    public void flushStream(InstanceDetails.Id id) {
        LOG.debug("removing data streams for {} as we are no publishing to it.", id);
        this.perHostAndNodeOpenDataStreamMap.remove(id);
    }

    private void closeAllDataStreams() {
        for (Map.Entry<InstanceDetails.Id, ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>> entry : this.perHostAndNodeOpenDataStreamMap.entrySet()) {
            LOG.debug("Closing stream for host: {}", entry.getKey());
            Iterator<Map.Entry<String, AtomicReference<StreamObserver<FlowUnitMessage>>>> it = entry.getValue().entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().get().onCompleted();
            }
            this.perHostAndNodeOpenDataStreamMap.remove(entry.getKey());
        }
    }

    private StreamObserver<FlowUnitMessage> getDataStreamForHost(InstanceDetails instanceDetails, String str, StreamObserver<PublishResponse> streamObserver) {
        ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>> concurrentMap = this.perHostAndNodeOpenDataStreamMap.get(instanceDetails.getInstanceId());
        return (concurrentMap == null || concurrentMap.get(str) == null) ? addOrUpdateDataStreamForHost(instanceDetails, str, streamObserver) : concurrentMap.get(str).get();
    }

    private synchronized StreamObserver<FlowUnitMessage> addOrUpdateDataStreamForHost(InstanceDetails instanceDetails, String str, StreamObserver<PublishResponse> streamObserver) {
        StreamObserver<FlowUnitMessage> publish = this.connectionManager.getClientStubForHost(instanceDetails).publish(streamObserver);
        this.perHostAndNodeOpenDataStreamMap.computeIfAbsent(instanceDetails.getInstanceId(), id -> {
            return new ConcurrentHashMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>() { // from class: org.opensearch.performanceanalyzer.net.NetClient.1
                {
                    put(str, new AtomicReference());
                }
            };
        });
        this.perHostAndNodeOpenDataStreamMap.get(instanceDetails.getInstanceId()).computeIfAbsent(str, str2 -> {
            return new AtomicReference();
        }).set(publish);
        return publish;
    }
}
