package org.opensearch.performanceanalyzer.rca.net;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.AppContext;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.net.NetClient;
import org.opensearch.performanceanalyzer.rca.framework.core.Node;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.messages.DataMsg;
import org.opensearch.performanceanalyzer.rca.messages.IntentMsg;
import org.opensearch.performanceanalyzer.rca.messages.UnicastIntentMsg;
import org.opensearch.performanceanalyzer.rca.net.tasks.BroadcastSubscriptionTxTask;
import org.opensearch.performanceanalyzer.rca.net.tasks.FlowUnitTxTask;
import org.opensearch.performanceanalyzer.rca.net.tasks.UnicastSubscriptionTxTask;
import org.opensearch.performanceanalyzer.rca.util.ClusterUtils;

/* loaded from: input_file:org/opensearch/performanceanalyzer/rca/net/WireHopper.class */
public class WireHopper {
    private static final Logger LOG = LogManager.getLogger(WireHopper.class);
    private static final int MS_IN_S = 1000;
    private final NetClient netClient;
    private final SubscriptionManager subscriptionManager;
    private final NodeStateManager nodeStateManager;
    private final AtomicReference<ExecutorService> executorReference;
    private final ReceivedFlowUnitStore receivedFlowUnitStore;
    private final AppContext appContext;

    public WireHopper(NodeStateManager nodeStateManager, NetClient netClient, SubscriptionManager subscriptionManager, AtomicReference<ExecutorService> atomicReference, ReceivedFlowUnitStore receivedFlowUnitStore, AppContext appContext) {
        this.netClient = netClient;
        this.subscriptionManager = subscriptionManager;
        this.nodeStateManager = nodeStateManager;
        this.executorReference = atomicReference;
        this.receivedFlowUnitStore = receivedFlowUnitStore;
        this.appContext = appContext;
    }

    public void sendIntent(IntentMsg intentMsg) {
        ExecutorService executorService = this.executorReference.get();
        if (executorService != null) {
            try {
                executorService.execute(new BroadcastSubscriptionTxTask(this.netClient, intentMsg, this.subscriptionManager, this.nodeStateManager, this.appContext));
            } catch (RejectedExecutionException e) {
                LOG.warn("Dropped sending subscription because the threadpool queue is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
    }

    public void sendData(DataMsg dataMsg) {
        ExecutorService executorService = this.executorReference.get();
        if (executorService != null) {
            try {
                executorService.execute(new FlowUnitTxTask(this.netClient, this.subscriptionManager, dataMsg, this.appContext));
            } catch (RejectedExecutionException e) {
                LOG.warn("Dropped sending flow unit because the threadpool queue is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
    }

    @VisibleForTesting
    public AppContext getAppContext() {
        return this.appContext;
    }

    public List<FlowUnitMessage> readFromWire(Node<?> node) {
        String name = node.name();
        long evaluationIntervalSeconds = node.getEvaluationIntervalSeconds();
        ImmutableList<FlowUnitMessage> drainNode = this.receivedFlowUnitStore.drainNode(name);
        Set<InstanceDetails.Id> publishersForNode = this.subscriptionManager.getPublishersForNode(name);
        for (InstanceDetails.Id id : publishersForNode) {
            if (!ClusterUtils.isHostIdInCluster(id, this.appContext.getAllClusterInstances())) {
                this.subscriptionManager.unsubscribeAndTerminateConnection(name, id);
            }
        }
        UnmodifiableIterator it = this.nodeStateManager.getStaleOrNotSubscribedNodes(name, 2 * evaluationIntervalSeconds * 1000, publishersForNode).iterator();
        while (it.hasNext()) {
            InstanceDetails instanceDetails = (InstanceDetails) it.next();
            ExecutorService executorService = this.executorReference.get();
            if (executorService != null) {
                try {
                    executorService.execute(new UnicastSubscriptionTxTask(this.netClient, new UnicastIntentMsg("", name, node.getTags(), instanceDetails), this.subscriptionManager, this.nodeStateManager, this.appContext));
                } catch (RejectedExecutionException e) {
                    LOG.warn("Dropped sending subscription request because the threadpool queue is full");
                    StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
                }
            }
        }
        return drainNode;
    }

    @VisibleForTesting
    public void shutdownAll() {
        this.executorReference.get().shutdown();
        this.netClient.stop();
        this.netClient.getConnectionManager().shutdown();
    }

    @VisibleForTesting
    public SubscriptionManager getSubscriptionManager() {
        return this.subscriptionManager;
    }

    @VisibleForTesting
    public NodeStateManager getNodeStateManager() {
        return this.nodeStateManager;
    }

    @VisibleForTesting
    public AtomicReference<ExecutorService> getExecutorReference() {
        return this.executorReference;
    }

    @VisibleForTesting
    public ReceivedFlowUnitStore getReceivedFlowUnitStore() {
        return this.receivedFlowUnitStore;
    }
}
