/*
 * Decompiled with CFR 0.152.
 */
package id.onyx.obdp.server.events;

import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import id.onyx.obdp.server.HostNotRegisteredException;
import id.onyx.obdp.server.OBDPException;
import id.onyx.obdp.server.agent.AgentSessionManager;
import id.onyx.obdp.server.agent.stomp.dto.AckReport;
import id.onyx.obdp.server.events.ExecutionCommandEvent;
import id.onyx.obdp.server.events.HostRegisteredEvent;
import id.onyx.obdp.server.events.MessageNotDelivered;
import id.onyx.obdp.server.events.STOMPEvent;
import id.onyx.obdp.server.events.STOMPHostEvent;
import id.onyx.obdp.server.events.publishers.OBDPEventPublisher;
import id.onyx.obdp.server.utils.ScheduledExecutorCompletionService;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;

public abstract class MessageEmitter {
    protected static final AtomicLong MESSAGE_ID = new AtomicLong(0L);
    private static final Logger LOG = LoggerFactory.getLogger(MessageEmitter.class);
    public final int retryCount;
    public final int retryInterval;
    protected final AgentSessionManager agentSessionManager;
    protected final SimpMessagingTemplate simpMessagingTemplate;
    protected final ScheduledExecutorService emitExecutor = Executors.newScheduledThreadPool(10, new ThreadFactoryBuilder().setNameFormat("agent-message-emitter-%d").build());
    protected final ExecutorService monitorExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("agent-message-monitor-%d").build());
    protected final ExecutorService retryExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("agent-message-retry-%d").build());
    protected final ScheduledExecutorCompletionService<EmitTaskWrapper> emitCompletionService = new ScheduledExecutorCompletionService(this.emitExecutor, new LinkedBlockingQueue());
    protected ConcurrentHashMap<Long, EmitTaskWrapper> unconfirmedMessages = new ConcurrentHashMap();
    protected ConcurrentHashMap<Long, BlockingQueue<EmitTaskWrapper>> messagesToEmit = new ConcurrentHashMap();
    private OBDPEventPublisher ambariEventPublisher;

    public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate, OBDPEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
        this.agentSessionManager = agentSessionManager;
        this.simpMessagingTemplate = simpMessagingTemplate;
        this.ambariEventPublisher = ambariEventPublisher;
        this.retryCount = retryCount;
        this.retryInterval = retryInterval;
        ambariEventPublisher.register(this);
        this.monitorExecutor.execute(new MessagesToEmitMonitor());
        this.retryExecutor.execute(new MessagesToRetryMonitor());
    }

    abstract void emitMessage(STOMPEvent var1) throws OBDPException;

    public void emitMessageRetriable(ExecutionCommandEvent event) {
        EmitTaskWrapper wrapper = new EmitTaskWrapper(0, MESSAGE_ID.getAndIncrement(), event);
        Long hostId = event.getHostId();
        this.messagesToEmit.compute(hostId, (id, hostMessages) -> {
            if (hostMessages == null) {
                LOG.error("Trying to emit message to unregistered host with id {}", (Object)hostId);
                return null;
            }
            hostMessages.add(wrapper);
            return hostMessages;
        });
    }

    public void processReceiveReport(Long hostId, AckReport ackReport) {
        Long messageId = ackReport.getMessageId();
        if (AckReport.AckStatus.OK.equals((Object)ackReport.getStatus())) {
            this.unconfirmedMessages.compute(hostId, (id, commandInUse) -> {
                if (commandInUse != null && commandInUse.getMessageId().equals(ackReport.getMessageId())) {
                    return null;
                }
                LOG.warn("OK agent report was received again for already complete command with message id {}", (Object)messageId);
                return commandInUse;
            });
        } else {
            LOG.error("Received {} agent report for execution command with messageId {} with following reason: {}", new Object[]{ackReport.getStatus(), messageId, ackReport.getReason()});
        }
    }

    protected abstract String getDestination(STOMPEvent var1);

    protected MessageHeaders createHeaders(String sessionId) {
        return this.createHeaders(sessionId, null);
    }

    protected MessageHeaders createHeaders(String sessionId, Long messageId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create((SimpMessageType)SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);
        if (messageId != null) {
            headerAccessor.setNativeHeader("messageId", Long.toString(messageId));
        }
        return headerAccessor.getMessageHeaders();
    }

    protected void emitMessageToAll(STOMPEvent event) {
        LOG.debug("Received status update event {}", (Object)event);
        this.simpMessagingTemplate.convertAndSend((Object)this.getDestination(event), (Object)event);
    }

    protected void emitMessageToHost(STOMPHostEvent event) throws HostNotRegisteredException {
        Long hostId = event.getHostId();
        String sessionId = this.agentSessionManager.getSessionId(hostId);
        LOG.debug("Received status update event {} for host {} registered with session ID {}", new Object[]{event, hostId, sessionId});
        MessageHeaders headers = this.createHeaders(sessionId);
        this.simpMessagingTemplate.convertAndSendToUser(sessionId, this.getDestination(event), (Object)event, (Map)headers);
    }

    protected void emitExecutionCommandToHost(EmitTaskWrapper eventWrapper) throws HostNotRegisteredException {
        ExecutionCommandEvent event = eventWrapper.getExecutionCommandEvent();
        Long hostId = event.getHostId();
        Long messageId = eventWrapper.getMessageId();
        String sessionId = this.agentSessionManager.getSessionId(hostId);
        LOG.debug("Received status update event {} for host {} registered with session ID {}", new Object[]{event, hostId, sessionId});
        MessageHeaders headers = this.createHeaders(sessionId, messageId);
        this.simpMessagingTemplate.convertAndSendToUser(sessionId, this.getDestination(event), (Object)event, (Map)headers);
    }

    @Subscribe
    public void onHostRegister(HostRegisteredEvent hostRegisteredEvent) {
        Long hostId = hostRegisteredEvent.getHostId();
        this.messagesToEmit.computeIfAbsent(hostId, id -> new LinkedBlockingQueue());
    }

    private class MessagesToEmitMonitor
    implements Runnable {
        private boolean anyActionPerformed;

        private MessagesToEmitMonitor() {
        }

        @Override
        public void run() {
            while (true) {
                this.anyActionPerformed = false;
                for (Long hostId : MessageEmitter.this.messagesToEmit.keySet()) {
                    MessageEmitter.this.unconfirmedMessages.computeIfAbsent(hostId, id -> {
                        EmitTaskWrapper event = (EmitTaskWrapper)MessageEmitter.this.messagesToEmit.get(hostId).poll();
                        if (event != null) {
                            LOG.info("Schedule execution command emitting, retry: {}, messageId: {}", (Object)event.getRetryCounter(), (Object)event.getMessageId());
                            MessageEmitter.this.emitCompletionService.submit(new EmitMessageTask(event, false));
                            this.anyActionPerformed = true;
                        }
                        return event;
                    });
                }
                if (this.anyActionPerformed) continue;
                try {
                    Thread.sleep(200L);
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.error("Exception during sleep", (Throwable)e);
                    continue;
                }
                break;
            }
        }
    }

    private class MessagesToRetryMonitor
    implements Runnable {
        private MessagesToRetryMonitor() {
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        Future future = MessageEmitter.this.emitCompletionService.take();
                        EmitTaskWrapper result = (EmitTaskWrapper)future.get();
                        Long hostId = result.getExecutionCommandEvent().getHostId();
                        MessageEmitter.this.unconfirmedMessages.compute(hostId, (id, commandInUse) -> {
                            if (commandInUse != null && commandInUse.getMessageId().equals(result.getMessageId())) {
                                if (result.getRetryCounter() < MessageEmitter.this.retryCount) {
                                    result.retry();
                                    LOG.warn("Reschedule execution command emitting, retry: {}, messageId: {}", (Object)result.getRetryCounter(), (Object)result.getMessageId());
                                    MessageEmitter.this.emitCompletionService.schedule(new EmitMessageTask(result, true), MessageEmitter.this.retryInterval, TimeUnit.SECONDS);
                                } else {
                                    ExecutionCommandEvent event = result.getExecutionCommandEvent();
                                    MessageEmitter.this.messagesToEmit.remove(event.getHostId());
                                    MessageEmitter.this.ambariEventPublisher.publish(new MessageNotDelivered(event.getHostId()));
                                    return null;
                                }
                            }
                            return commandInUse;
                        });
                    }
                }
                catch (InterruptedException e) {
                    LOG.error("Retry message emitting monitor was interrupted", (Throwable)e);
                    continue;
                }
                catch (ExecutionException e) {
                    LOG.error("Exception during message emitting retry", (Throwable)e);
                    continue;
                }
                break;
            }
        }
    }

    private class EmitTaskWrapper {
        private final Long messageId;
        private final ExecutionCommandEvent executionCommandEvent;
        private final AtomicInteger retryCounter;

        public EmitTaskWrapper(int retryCounter, Long messageId, ExecutionCommandEvent executionCommandEvent) {
            this.retryCounter = new AtomicInteger(retryCounter);
            this.messageId = messageId;
            this.executionCommandEvent = executionCommandEvent;
        }

        public int getRetryCounter() {
            return this.retryCounter.get();
        }

        public ExecutionCommandEvent getExecutionCommandEvent() {
            return this.executionCommandEvent;
        }

        public Long getMessageId() {
            return this.messageId;
        }

        public void retry() {
            this.retryCounter.incrementAndGet();
        }
    }

    private class EmitMessageTask
    implements Callable<EmitTaskWrapper> {
        private final EmitTaskWrapper emitTaskWrapper;
        private final boolean checkRelevance;

        public EmitMessageTask(EmitTaskWrapper emitTaskWrapper, boolean checkRelevance) {
            this.emitTaskWrapper = emitTaskWrapper;
            this.checkRelevance = checkRelevance;
        }

        @Override
        public EmitTaskWrapper call() throws Exception {
            try {
                if (this.checkRelevance) {
                    Long hostId = this.emitTaskWrapper.getExecutionCommandEvent().getHostId();
                    EmitTaskWrapper commandInUse = MessageEmitter.this.unconfirmedMessages.get(hostId);
                    if (commandInUse != null && commandInUse.getMessageId().equals(this.emitTaskWrapper.getMessageId())) {
                        MessageEmitter.this.emitExecutionCommandToHost(this.emitTaskWrapper);
                    }
                } else {
                    MessageEmitter.this.emitExecutionCommandToHost(this.emitTaskWrapper);
                }
            }
            catch (HostNotRegisteredException e) {
                LOG.error("Trying to emit execution command to unregistered host {} on attempt {}", new Object[]{this.emitTaskWrapper.getMessageId(), this.emitTaskWrapper.getRetryCounter(), e});
            }
            return this.emitTaskWrapper;
        }
    }
}

