package org.apache.ambari.server.events;

import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotRegisteredException;
import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.stomp.dto.AckReport;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.utils.ScheduledExecutorCompletionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;

/* loaded from: input_file:org/apache/ambari/server/events/MessageEmitter.class */
public abstract class MessageEmitter {
    protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
    private static final Logger LOG = LoggerFactory.getLogger(MessageEmitter.class);
    public final int retryCount;
    public final int retryInterval;
    protected final AgentSessionManager agentSessionManager;
    protected final SimpMessagingTemplate simpMessagingTemplate;
    protected final ScheduledExecutorService emitExecutor = Executors.newScheduledThreadPool(10, new ThreadFactoryBuilder().setNameFormat("agent-message-emitter-%d").build());
    protected final ExecutorService monitorExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("agent-message-monitor-%d").build());
    protected final ExecutorService retryExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("agent-message-retry-%d").build());
    protected final ScheduledExecutorCompletionService<EmitTaskWrapper> emitCompletionService = new ScheduledExecutorCompletionService<>(this.emitExecutor, new LinkedBlockingQueue());
    protected ConcurrentHashMap<Long, EmitTaskWrapper> unconfirmedMessages = new ConcurrentHashMap<>();
    protected ConcurrentHashMap<Long, BlockingQueue<EmitTaskWrapper>> messagesToEmit = new ConcurrentHashMap<>();
    private AmbariEventPublisher ambariEventPublisher;

    /* loaded from: input_file:org/apache/ambari/server/events/MessageEmitter$EmitMessageTask.class */
    private class EmitMessageTask implements Callable<EmitTaskWrapper> {
        private final EmitTaskWrapper emitTaskWrapper;
        private final boolean checkRelevance;

        public EmitMessageTask(EmitTaskWrapper emitTaskWrapper, boolean z) {
            this.emitTaskWrapper = emitTaskWrapper;
            this.checkRelevance = z;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public EmitTaskWrapper call() throws Exception {
            try {
                if (this.checkRelevance) {
                    EmitTaskWrapper emitTaskWrapper = MessageEmitter.this.unconfirmedMessages.get(this.emitTaskWrapper.getExecutionCommandEvent().getHostId());
                    if (emitTaskWrapper != null && emitTaskWrapper.getMessageId().equals(this.emitTaskWrapper.getMessageId())) {
                        MessageEmitter.this.emitExecutionCommandToHost(this.emitTaskWrapper);
                    }
                } else {
                    MessageEmitter.this.emitExecutionCommandToHost(this.emitTaskWrapper);
                }
            } catch (HostNotRegisteredException e) {
                MessageEmitter.LOG.error("Trying to emit execution command to unregistered host {} on attempt {}", new Object[]{this.emitTaskWrapper.getMessageId(), Integer.valueOf(this.emitTaskWrapper.getRetryCounter()), e});
            }
            return this.emitTaskWrapper;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ambari/server/events/MessageEmitter$EmitTaskWrapper.class */
    public class EmitTaskWrapper {
        private final Long messageId;
        private final ExecutionCommandEvent executionCommandEvent;
        private final AtomicInteger retryCounter;

        public EmitTaskWrapper(int i, Long l, ExecutionCommandEvent executionCommandEvent) {
            this.retryCounter = new AtomicInteger(i);
            this.messageId = l;
            this.executionCommandEvent = executionCommandEvent;
        }

        public int getRetryCounter() {
            return this.retryCounter.get();
        }

        public ExecutionCommandEvent getExecutionCommandEvent() {
            return this.executionCommandEvent;
        }

        public Long getMessageId() {
            return this.messageId;
        }

        public void retry() {
            this.retryCounter.incrementAndGet();
        }
    }

    /* loaded from: input_file:org/apache/ambari/server/events/MessageEmitter$MessagesToEmitMonitor.class */
    private class MessagesToEmitMonitor implements Runnable {
        private boolean anyActionPerformed;

        private MessagesToEmitMonitor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                this.anyActionPerformed = false;
                Iterator it = MessageEmitter.this.messagesToEmit.keySet().iterator();
                while (it.hasNext()) {
                    Long l = (Long) it.next();
                    MessageEmitter.this.unconfirmedMessages.computeIfAbsent(l, l2 -> {
                        EmitTaskWrapper poll = MessageEmitter.this.messagesToEmit.get(l).poll();
                        if (poll != null) {
                            MessageEmitter.LOG.info("Schedule execution command emitting, retry: {}, messageId: {}", Integer.valueOf(poll.getRetryCounter()), poll.getMessageId());
                            MessageEmitter.this.emitCompletionService.submit(new EmitMessageTask(poll, false));
                            this.anyActionPerformed = true;
                        }
                        return poll;
                    });
                }
                if (!this.anyActionPerformed) {
                    try {
                        Thread.sleep(200L);
                    } catch (InterruptedException e) {
                        MessageEmitter.LOG.error("Exception during sleep", e);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/ambari/server/events/MessageEmitter$MessagesToRetryMonitor.class */
    private class MessagesToRetryMonitor implements Runnable {
        private MessagesToRetryMonitor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    EmitTaskWrapper emitTaskWrapper = MessageEmitter.this.emitCompletionService.take().get();
                    MessageEmitter.this.unconfirmedMessages.compute(emitTaskWrapper.getExecutionCommandEvent().getHostId(), (l, emitTaskWrapper2) -> {
                        if (emitTaskWrapper2 != null && emitTaskWrapper2.getMessageId().equals(emitTaskWrapper.getMessageId())) {
                            if (emitTaskWrapper.getRetryCounter() >= MessageEmitter.this.retryCount) {
                                ExecutionCommandEvent executionCommandEvent = emitTaskWrapper.getExecutionCommandEvent();
                                MessageEmitter.this.messagesToEmit.remove(executionCommandEvent.getHostId());
                                MessageEmitter.this.ambariEventPublisher.publish(new MessageNotDelivered(executionCommandEvent.getHostId()));
                                return null;
                            }
                            emitTaskWrapper.retry();
                            MessageEmitter.LOG.warn("Reschedule execution command emitting, retry: {}, messageId: {}", Integer.valueOf(emitTaskWrapper.getRetryCounter()), emitTaskWrapper.getMessageId());
                            MessageEmitter.this.emitCompletionService.schedule(new EmitMessageTask(emitTaskWrapper, true), MessageEmitter.this.retryInterval, TimeUnit.SECONDS);
                        }
                        return emitTaskWrapper2;
                    });
                } catch (InterruptedException e) {
                    MessageEmitter.LOG.error("Retry message emitting monitor was interrupted", e);
                } catch (ExecutionException e2) {
                    MessageEmitter.LOG.error("Exception during message emitting retry", e2);
                }
            }
        }
    }

    public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate, AmbariEventPublisher ambariEventPublisher, int i, int i2) {
        this.agentSessionManager = agentSessionManager;
        this.simpMessagingTemplate = simpMessagingTemplate;
        this.ambariEventPublisher = ambariEventPublisher;
        this.retryCount = i;
        this.retryInterval = i2;
        ambariEventPublisher.register(this);
        this.monitorExecutor.execute(new MessagesToEmitMonitor());
        this.retryExecutor.execute(new MessagesToRetryMonitor());
    }

    abstract void emitMessage(STOMPEvent sTOMPEvent) throws AmbariException;

    public void emitMessageRetriable(ExecutionCommandEvent executionCommandEvent) {
        EmitTaskWrapper emitTaskWrapper = new EmitTaskWrapper(0, Long.valueOf(MESSAGE_ID.getAndIncrement()), executionCommandEvent);
        Long hostId = executionCommandEvent.getHostId();
        this.messagesToEmit.compute(hostId, (l, blockingQueue) -> {
            if (blockingQueue == null) {
                LOG.error("Trying to emit message to unregistered host with id {}", hostId);
                return null;
            }
            blockingQueue.add(emitTaskWrapper);
            return blockingQueue;
        });
    }

    public void processReceiveReport(Long l, AckReport ackReport) {
        Long messageId = ackReport.getMessageId();
        if (AckReport.AckStatus.OK.equals(ackReport.getStatus())) {
            this.unconfirmedMessages.compute(l, (l2, emitTaskWrapper) -> {
                if (emitTaskWrapper != null && emitTaskWrapper.getMessageId().equals(ackReport.getMessageId())) {
                    return null;
                }
                LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
                return emitTaskWrapper;
            });
        } else {
            LOG.error("Received {} agent report for execution command with messageId {} with following reason: {}", new Object[]{ackReport.getStatus(), messageId, ackReport.getReason()});
        }
    }

    protected abstract String getDestination(STOMPEvent sTOMPEvent);

    protected MessageHeaders createHeaders(String str) {
        return createHeaders(str, null);
    }

    protected MessageHeaders createHeaders(String str, Long l) {
        SimpMessageHeaderAccessor create = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        create.setSessionId(str);
        create.setLeaveMutable(true);
        if (l != null) {
            create.setNativeHeader("messageId", Long.toString(l.longValue()));
        }
        return create.getMessageHeaders();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitMessageToAll(STOMPEvent sTOMPEvent) {
        LOG.debug("Received status update event {}", sTOMPEvent);
        this.simpMessagingTemplate.convertAndSend(getDestination(sTOMPEvent), sTOMPEvent);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitMessageToHost(STOMPHostEvent sTOMPHostEvent) throws HostNotRegisteredException {
        Long hostId = sTOMPHostEvent.getHostId();
        String sessionId = this.agentSessionManager.getSessionId(hostId);
        LOG.debug("Received status update event {} for host {} registered with session ID {}", new Object[]{sTOMPHostEvent, hostId, sessionId});
        this.simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(sTOMPHostEvent), sTOMPHostEvent, createHeaders(sessionId));
    }

    protected void emitExecutionCommandToHost(EmitTaskWrapper emitTaskWrapper) throws HostNotRegisteredException {
        ExecutionCommandEvent executionCommandEvent = emitTaskWrapper.getExecutionCommandEvent();
        Long hostId = executionCommandEvent.getHostId();
        Long messageId = emitTaskWrapper.getMessageId();
        String sessionId = this.agentSessionManager.getSessionId(hostId);
        LOG.debug("Received status update event {} for host {} registered with session ID {}", new Object[]{executionCommandEvent, hostId, sessionId});
        this.simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(executionCommandEvent), executionCommandEvent, createHeaders(sessionId, messageId));
    }

    @Subscribe
    public void onHostRegister(HostRegisteredEvent hostRegisteredEvent) {
        this.messagesToEmit.computeIfAbsent(hostRegisteredEvent.getHostId(), l -> {
            return new LinkedBlockingQueue();
        });
    }
}
