package org.apache.atlas.notification.spool;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataOutput;
import java.util.Iterator;
import java.util.List;
import org.apache.atlas.hook.FailedMessagesLogger;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/notification/spool/Spooler.class */
public class Spooler extends AbstractNotification {
    private static final Logger LOG = LoggerFactory.getLogger(Spooler.class);
    private final SpoolConfiguration configuration;
    private final IndexManagement indexManagement;
    private FailedMessagesLogger failedMessagesLogger;
    private boolean isDrain;

    public Spooler(SpoolConfiguration spoolConfiguration, IndexManagement indexManagement) {
        this.configuration = spoolConfiguration;
        this.indexManagement = indexManagement;
    }

    public void setFailedMessagesLogger(FailedMessagesLogger failedMessagesLogger) {
        this.failedMessagesLogger = failedMessagesLogger;
    }

    public void setDrain() {
        this.isDrain = true;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int i) {
        return null;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public void close() {
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public boolean isReady(NotificationInterface.NotificationType notificationType) {
        return true;
    }

    @Override // org.apache.atlas.notification.AbstractNotification
    public void sendInternal(NotificationInterface.NotificationType notificationType, List<String> list) {
        for (int i = 0; i < list.size(); i++) {
            AtlasNotificationMessage atlasNotificationMessage = (AtlasNotificationMessage) AtlasType.fromV1Json(list.get(i), AtlasNotificationMessage.class);
            atlasNotificationMessage.setSpooled(true);
            list.set(i, AtlasType.toV1Json(atlasNotificationMessage));
        }
        boolean write = write(list);
        if (this.failedMessagesLogger == null || write) {
            return;
        }
        writeToFailedMessages(list);
    }

    @Override // org.apache.atlas.notification.AbstractNotification
    public void sendInternal(String str, List<String> list) throws NotificationException {
        throw new NotImplementedException("sendInternal method is not implemented.");
    }

    @VisibleForTesting
    boolean write(List<String> list) {
        boolean z;
        try {
            if (getDrain()) {
                LOG.error("Spooler.write(source={}): called after stop is called! {} messages will not be written to spool!", this.configuration.getSourceName(), Integer.valueOf(list != null ? list.size() : 0));
                z = false;
            } else {
                this.indexManagement.setSpoolWriteInProgress();
                z = writeInternal(list);
            }
            return z;
        } finally {
            this.indexManagement.resetSpoolWriteInProgress();
        }
    }

    private void writeToFailedMessages(List<String> list) {
        if (this.failedMessagesLogger != null) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                this.failedMessagesLogger.log(it.next());
            }
        }
    }

    private boolean writeInternal(List<String> list) {
        boolean z;
        try {
            byte[] bytes = SpoolUtils.getLineSeparator().getBytes(SpoolUtils.DEFAULT_CHAR_SET);
            DataOutput spoolWriter = this.indexManagement.getSpoolWriter();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                spoolWriter.write(it.next().getBytes(SpoolUtils.DEFAULT_CHAR_SET));
                spoolWriter.write(bytes);
            }
            this.indexManagement.flushSpoolWriter();
            z = true;
        } catch (Exception e) {
            LOG.error("Spooler.writeInternal(source={}): error writing to file. messages={}", new Object[]{this.configuration.getSourceName(), list, e});
            z = false;
        }
        return z;
    }

    private boolean getDrain() {
        return this.isDrain;
    }
}
