package org.opensearch.performanceanalyzer.rca.net.tasks;

import com.google.common.collect.ImmutableSet;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.AppContext;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.grpc.PublishResponse;
import org.opensearch.performanceanalyzer.net.NetClient;
import org.opensearch.performanceanalyzer.rca.framework.core.GenericFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.messages.DataMsg;
import org.opensearch.performanceanalyzer.rca.net.SubscriptionManager;

/* loaded from: input_file:org/opensearch/performanceanalyzer/rca/net/tasks/FlowUnitTxTask.class */
public class FlowUnitTxTask implements Runnable {
    private static final Logger LOG = LogManager.getLogger(FlowUnitTxTask.class);
    private final NetClient client;
    private final SubscriptionManager subscriptionManager;
    private final DataMsg dataMsg;
    private final AppContext appContext;

    public FlowUnitTxTask(NetClient netClient, SubscriptionManager subscriptionManager, DataMsg dataMsg, AppContext appContext) {
        this.client = netClient;
        this.subscriptionManager = subscriptionManager;
        this.dataMsg = dataMsg;
        this.appContext = appContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        final String sourceGraphNode = this.dataMsg.getSourceGraphNode();
        InstanceDetails myInstanceDetails = this.appContext.getMyInstanceDetails();
        if (!this.subscriptionManager.isNodeSubscribed(sourceGraphNode)) {
            LOG.debug("No subscribers for {}.", sourceGraphNode);
            return;
        }
        ImmutableSet<InstanceDetails.Id> subscribersFor = this.subscriptionManager.getSubscribersFor(sourceGraphNode);
        LOG.debug("{} has downstream subscribers: {}", sourceGraphNode, subscribersFor);
        for (final InstanceDetails.Id id : subscribersFor) {
            for (GenericFlowUnit genericFlowUnit : this.dataMsg.getFlowUnits()) {
                LOG.debug("rca: [pub-tx]: {} -> {}", sourceGraphNode, id);
                this.client.publish(this.appContext.getInstanceById(id), genericFlowUnit.buildFlowUnitMessage(sourceGraphNode, myInstanceDetails.getInstanceId()), new StreamObserver<PublishResponse>() { // from class: org.opensearch.performanceanalyzer.rca.net.tasks.FlowUnitTxTask.1
                    public void onNext(PublishResponse publishResponse) {
                        FlowUnitTxTask.LOG.debug("rca: Received acknowledgement from the server. status: {}", publishResponse.getDataStatus());
                        if (publishResponse.getDataStatus() == PublishResponse.PublishResponseStatus.NODE_SHUTDOWN) {
                            FlowUnitTxTask.this.subscriptionManager.unsubscribeAndTerminateConnection(sourceGraphNode, id);
                            FlowUnitTxTask.this.client.flushStream(id);
                        }
                    }

                    public void onError(Throwable th) {
                        ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.RCA_NETWORK_ERROR, sourceGraphNode, 1);
                        FlowUnitTxTask.LOG.error("rca: Encountered an exception at the server: ", th);
                        FlowUnitTxTask.this.subscriptionManager.unsubscribeAndTerminateConnection(sourceGraphNode, id);
                        FlowUnitTxTask.this.client.flushStream(id);
                    }

                    public void onCompleted() {
                        FlowUnitTxTask.LOG.debug("rca: Server closed the data channel!");
                    }
                });
                ServiceMetrics.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.RCA_NODES_FU_PUBLISH_COUNT, sourceGraphNode, 1);
            }
        }
    }
}
