package org.apache.atlas.notification.rest;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/notification/rest/RestNotification.class */
public class RestNotification extends AbstractNotification {
    private static final String ATLAS_ENDPOINT = "atlas.rest.address";
    private static final String BASIC_AUTH_USERNAME = "atlas.rest.basic.auth.username";
    private static final String BASIC_AUTH_PASSWORD = "atlas.rest.basic.auth.password";
    private static final String DEFAULT_ATLAS_URL = "http://localhost:31000/";

    @VisibleForTesting
    public AtlasClientV2 atlasClientV2;
    private static final Logger LOG = LoggerFactory.getLogger(RestNotification.class);
    private static final int BATCH_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES.getInt();
    private static final Map<NotificationInterface.NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap();

    public RestNotification(Configuration configuration) throws AtlasException {
        setupAtlasClientV2(configuration);
    }

    @Override // org.apache.atlas.notification.AbstractNotification
    public void sendInternal(NotificationInterface.NotificationType notificationType, List<String> list) throws NotificationException {
        String str = PRODUCER_TOPIC_MAP.get(notificationType);
        int i = 0;
        try {
            Iterator<List<String>> it = getBatches(list).iterator();
            while (it.hasNext()) {
                i++;
                this.atlasClientV2.postNotificationToTopic(str, it.next());
            }
        } catch (AtlasServiceException e) {
            if (!e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode())) {
                throw new RuntimeException((Throwable) e);
            }
            LOG.error("Sending notifications through REST interface failed starting from batch# {}", Integer.valueOf(i));
            throw new NotificationException(e);
        }
    }

    @Override // org.apache.atlas.notification.AbstractNotification
    public void sendInternal(String str, List<String> list) throws NotificationException {
        throw new NotImplementedException("sendInternal method is not implemented.");
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int i) {
        return null;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public void close() {
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public boolean isReady(NotificationInterface.NotificationType notificationType) {
        return true;
    }

    private AtlasClientV2 setupAtlasClientV2(Configuration configuration) throws AtlasException {
        if (this.atlasClientV2 != null) {
            return this.atlasClientV2;
        }
        try {
            String[] stringArray = configuration.getStringArray(ATLAS_ENDPOINT);
            if (stringArray == null || stringArray.length == 0) {
                stringArray = new String[]{DEFAULT_ATLAS_URL};
            }
            if (AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                this.atlasClientV2 = new AtlasClientV2(stringArray);
            } else {
                String string = configuration.getString(BASIC_AUTH_USERNAME, "admin");
                String string2 = configuration.getString(BASIC_AUTH_PASSWORD, "admin123");
                this.atlasClientV2 = new AtlasClientV2(stringArray, (string == null || string2 == null) ? AuthenticationUtil.getBasicAuthenticationInput() : new String[]{string, string2});
            }
            return this.atlasClientV2;
        } catch (AtlasException e) {
            throw new AtlasException(e);
        }
    }

    private List<List<String>> getBatches(List<String> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        for (String str : list) {
            byte[] bytesUtf8 = AtlasNotificationBaseMessage.getBytesUtf8(str);
            if (i > 0 && i + bytesUtf8.length > BATCH_MAX_LENGTH_BYTES) {
                arrayList.add(arrayList2);
                arrayList2 = new ArrayList();
                i = 0;
            }
            arrayList2.add(str);
            i += bytesUtf8.length;
        }
        arrayList.add(arrayList2);
        return arrayList;
    }

    static {
        PRODUCER_TOPIC_MAP.put(NotificationInterface.NotificationType.HOOK, KafkaNotification.ATLAS_HOOK_TOPIC);
        PRODUCER_TOPIC_MAP.put(NotificationInterface.NotificationType.ENTITIES, KafkaNotification.ATLAS_ENTITIES_TOPIC);
    }
}
