/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.web.rest;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Path(value="v2/notification")
@Singleton
@Service
@Consumes(value={"application/json; charset=UTF-8", "application/json"})
@Produces(value={"application/json; charset=UTF-8", "application/json"})
public class NotificationREST {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationREST.class);
    public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
    public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
    private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(new String[]{ATLAS_HOOK_TOPIC});
    private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(new String[]{ATLAS_ENTITIES_TOPIC});
    private static final Set<String> TOPICS = new HashSet<String>();
    private final NotificationInterface notificationInterface;

    @Inject
    public NotificationREST(NotificationInterface notificationInterface) {
        this.notificationInterface = notificationInterface;
    }

    @POST
    @Path(value="/topic/{topicName}")
    @Consumes(value={"application/json; charset=UTF-8", "application/json"})
    public void handleNotifications(@PathParam(value="topicName") String topicName, @Context HttpServletRequest request) throws AtlasBaseException, IOException {
        LOG.debug("Handling notifications for topic {}", (Object)topicName);
        AtlasAuthorizationUtils.verifyAccess((AtlasAdminAccessRequest)new AtlasAdminAccessRequest(AtlasPrivilege.SERVICE_NOTIFICATION_POST), (Object[])new Object[]{"post on rest notification service"});
        if (!TOPICS.contains(topicName)) {
            throw new AtlasBaseException(AtlasErrorCode.INVALID_TOPIC_NAME, new String[]{topicName});
        }
        String messagesAsJson = Servlets.getRequestPayload(request);
        List<String> messages = this.getMessagesToNotify(messagesAsJson);
        try {
            KafkaNotification notifier = (KafkaNotification)this.notificationInterface;
            notifier.sendInternal(topicName, messages, AtlasHook.isHookMsgsSortEnabled);
        }
        catch (NotificationException exception) {
            List failedMessages = exception.getFailedMessages();
            String concatenatedMessage = StringUtils.join((Collection)failedMessages, (String)"\n");
            throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_EXCEPTION, (Throwable)exception, new String[]{concatenatedMessage});
        }
    }

    private List<String> getMessagesToNotify(String messagesAsJson) {
        ArrayList<String> messages = new ArrayList<String>();
        try {
            ArrayNode messageNodes = AtlasJson.parseToV1ArrayNode((String)messagesAsJson);
            for (JsonNode messageNode : messageNodes) {
                messages.add(AtlasJson.toV1Json((Object)messageNode));
            }
        }
        catch (IOException e) {
            messages.add(messagesAsJson);
        }
        return messages;
    }

    static {
        TOPICS.addAll(Arrays.asList(ATLAS_HOOK_CONSUMER_TOPICS));
        TOPICS.addAll(Arrays.asList(ATLAS_ENTITIES_CONSUMER_TOPICS));
    }
}

