package org.apache.atlas.notification.spool;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInput;
import java.io.File;
import java.io.FileNotFoundException;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/notification/spool/Publisher.class */
public class Publisher implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
    private final SpoolConfiguration configuration;
    private final IndexManagement indexManagement;
    private final AbstractNotification notificationHandler;
    private final String notificationHandlerName;
    private final int retryDestinationMS;
    private final int messageBatchSize;
    private String source;
    private boolean isDrain;
    private boolean isDestDown;

    public Publisher(SpoolConfiguration spoolConfiguration, IndexManagement indexManagement, AbstractNotification abstractNotification) {
        this.configuration = spoolConfiguration;
        this.indexManagement = indexManagement;
        this.notificationHandler = abstractNotification;
        this.notificationHandlerName = abstractNotification.getClass().getSimpleName();
        this.retryDestinationMS = spoolConfiguration.getRetryDestinationMS();
        this.messageBatchSize = spoolConfiguration.getMessageBatchSize();
    }

    @Override // java.lang.Runnable
    public void run() {
        this.source = this.configuration.getSourceName();
        LOG.info("Publisher.run(source={}): starting publisher {}", this.source, this.notificationHandlerName);
        IndexRecord indexRecord = null;
        while (true) {
            try {
                checkAndWaitIfDestinationDown();
                if (this.isDrain) {
                    break;
                }
                if (!this.isDestDown) {
                    indexRecord = fetchNext(indexRecord);
                    if (indexRecord == null || !processAndDispatch(indexRecord)) {
                        this.indexManagement.rolloverSpoolFileIfNeeded();
                    } else {
                        this.indexManagement.removeAsDone(indexRecord);
                        indexRecord = null;
                    }
                }
            } catch (InterruptedException e) {
                LOG.error("Publisher.run(source={}): {}: Publisher: Shutdown might be in progress!", this.source, this.notificationHandlerName);
            } catch (Exception e2) {
                LOG.error("Publisher.run(source={}): {}: Publisher: Exception in destination writing!", new Object[]{this.source, this.notificationHandlerName, e2});
            }
        }
        LOG.info("Publisher.run(source={}): publisher {} exited!", this.source, this.notificationHandlerName);
    }

    public void setDestinationDown() {
        this.isDestDown = true;
        this.indexManagement.updateFailedAttempt();
    }

    public void setDrain() {
        this.isDrain = true;
    }

    public boolean isDestinationDown() {
        return this.isDestDown;
    }

    @VisibleForTesting
    boolean processAndDispatch(IndexRecord indexRecord) {
        boolean z;
        if (SpoolUtils.fileExists(indexRecord)) {
            FileLockedReadWrite fileLockedReadWrite = new FileLockedReadWrite(this.source);
            try {
                try {
                    try {
                        DataInput input = fileLockedReadWrite.getInput(new File(indexRecord.getPath()));
                        int i = 0;
                        ArrayList arrayList = new ArrayList();
                        for (String readLine = input.readLine(); readLine != null; readLine = input.readLine()) {
                            i++;
                            if (i >= indexRecord.getLine()) {
                                arrayList.add(readLine);
                                if (arrayList.size() == this.messageBatchSize) {
                                    dispatch(indexRecord, i, arrayList);
                                }
                            }
                        }
                        dispatch(indexRecord, i, arrayList);
                        LOG.info("Publisher.processAndDispatch(source={}): consumer={}: done reading file {}", new Object[]{this.source, this.notificationHandlerName, indexRecord.getPath()});
                        z = true;
                        fileLockedReadWrite.close();
                    } catch (Exception e) {
                        LOG.error("Publisher.processAndDispatch(source={}): consumer={}: failed for file {}", new Object[]{this.source, this.notificationHandlerName, indexRecord.getPath(), e});
                        z = false;
                        fileLockedReadWrite.close();
                    }
                } catch (FileNotFoundException e2) {
                    LOG.error("Publisher.processAndDispatch(source={}): consumer={}: file not found {}", new Object[]{this.source, this.notificationHandlerName, indexRecord.getPath(), e2});
                    z = true;
                    fileLockedReadWrite.close();
                } catch (OverlappingFileLockException e3) {
                    LOG.error("Publisher.processAndDispatch(source={}): consumer={}: some other process has locked this file {}", new Object[]{this.source, this.notificationHandlerName, indexRecord.getPath(), e3});
                    z = false;
                    fileLockedReadWrite.close();
                }
            } catch (Throwable th) {
                fileLockedReadWrite.close();
                throw th;
            }
        } else {
            LOG.error("Publisher.processAndDispatch(source={}): publisher={}: file '{}' not found!", new Object[]{this.source, this.notificationHandlerName, indexRecord.getPath()});
            z = true;
        }
        return z;
    }

    private void checkAndWaitIfDestinationDown() throws InterruptedException {
        this.isDestDown = !this.notificationHandler.isReady(NotificationInterface.NotificationType.HOOK);
        if (this.isDestDown) {
            LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items", new Object[]{this.source, this.notificationHandlerName, Integer.valueOf(this.retryDestinationMS), Integer.valueOf(this.indexManagement.getQueueSize())});
            Thread.sleep(this.retryDestinationMS);
        }
    }

    private IndexRecord fetchNext(IndexRecord indexRecord) {
        if (indexRecord == null) {
            try {
                indexRecord = this.indexManagement.next();
            } catch (Exception e) {
                LOG.error("Publisher.fetchNext(source={}): failed!. publisher={}", new Object[]{this.source, this.notificationHandlerName, e});
            }
        }
        return indexRecord;
    }

    private void dispatch(IndexRecord indexRecord, int i, List<String> list) throws Exception {
        if (this.notificationHandler == null || list == null || list.isEmpty()) {
            LOG.error("Publisher.dispatch(source={}): consumer={}: error sending logs", this.source, this.notificationHandlerName);
            return;
        }
        dispatch(indexRecord.getPath(), list);
        indexRecord.setCurrentLine(i);
        this.indexManagement.update(indexRecord);
        this.isDestDown = false;
    }

    private void dispatch(String str, List<String> list) throws Exception {
        try {
            try {
                pauseBeforeSend();
                this.notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, list);
                if (this.isDestDown) {
                    LOG.info("Publisher.dispatch(source={}): consumer={}: destination is now up. file={}", new Object[]{this.source, this.notificationHandlerName, str});
                }
            } catch (Exception e) {
                setDestinationDown();
                LOG.error("Publisher.dispatch(source={}): consumer={}: error while sending logs to consumer", new Object[]{this.source, this.notificationHandlerName, e});
                throw new NotificationException(e, String.format("%s: %s: Publisher: Destination down!", this.source, this.notificationHandlerName));
            }
        } finally {
            list.clear();
        }
    }

    private void pauseBeforeSend() throws InterruptedException {
        if (this.configuration.isHiveMetaStore()) {
            return;
        }
        int pauseBeforeSendSec = this.configuration.getPauseBeforeSendSec() * 1000;
        LOG.info("Waiting before dispatch: {}", Integer.valueOf(pauseBeforeSendSec));
        Thread.sleep(pauseBeforeSendSec);
    }
}
