package org.opensearch.performanceanalyzer.rca.net.handler;

import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.grpc.SubscribeMessage;
import org.opensearch.performanceanalyzer.grpc.SubscribeResponse;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import org.opensearch.performanceanalyzer.rca.net.SubscriptionManager;
import org.opensearch.performanceanalyzer.rca.net.requests.CompositeSubscribeRequest;
import org.opensearch.performanceanalyzer.rca.net.tasks.SubscriptionRxTask;

/* loaded from: input_file:org/opensearch/performanceanalyzer/rca/net/handler/SubscribeServerHandler.class */
public class SubscribeServerHandler {
    private static final Logger LOG = LogManager.getLogger(SubscribeServerHandler.class);
    private final AtomicReference<ExecutorService> executorServiceAtomicReference;
    private final SubscriptionManager subscriptionManager;

    public SubscribeServerHandler(SubscriptionManager subscriptionManager, AtomicReference<ExecutorService> atomicReference) {
        this.executorServiceAtomicReference = atomicReference;
        this.subscriptionManager = subscriptionManager;
    }

    public void handleSubscriptionRequest(SubscribeMessage subscribeMessage, StreamObserver<SubscribeResponse> streamObserver) {
        CompositeSubscribeRequest compositeSubscribeRequest = new CompositeSubscribeRequest(subscribeMessage, streamObserver);
        ExecutorService executorService = this.executorServiceAtomicReference.get();
        if (executorService != null) {
            try {
                executorService.execute(new SubscriptionRxTask(this.subscriptionManager, compositeSubscribeRequest));
                ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_IN, compositeSubscribeRequest.getSubscribeMessage().getRequesterGraphNode(), Integer.valueOf(compositeSubscribeRequest.getSubscribeMessage().getSerializedSize()));
            } catch (RejectedExecutionException e) {
                LOG.warn("Dropped processing subscription request because the network threadpool is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
    }
}
