/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.notification;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
import org.apache.atlas.notification.NotificationHookConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.repository.impexp.AsyncImportService;
import org.apache.atlas.repository.store.graph.v2.asyncimport.ImportTaskListener;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Order(value=8)
@DependsOn(value={"notificationHookConsumer"})
public class ImportTaskListenerImpl
implements Service,
ActiveStateChangeHandler,
ImportTaskListener {
    private static final Logger LOG = LoggerFactory.getLogger(ImportTaskListenerImpl.class);
    private static final String THREADNAME_PREFIX = ImportTaskListener.class.getSimpleName();
    private static final int ASYNC_IMPORT_PERMITS = 1;
    private final BlockingQueue<String> requestQueue;
    private final ExecutorService executorService;
    private final AsyncImportService asyncImportService;
    private final NotificationHookConsumer notificationHookConsumer;
    private final Semaphore asyncImportSemaphore;
    private final Configuration applicationProperties;

    @Inject
    public ImportTaskListenerImpl(AsyncImportService asyncImportService, NotificationHookConsumer notificationHookConsumer) throws AtlasException {
        this(asyncImportService, notificationHookConsumer, new LinkedBlockingQueue<String>());
    }

    public ImportTaskListenerImpl(AsyncImportService asyncImportService, NotificationHookConsumer notificationHookConsumer, BlockingQueue<String> requestQueue) throws AtlasException {
        this.asyncImportService = asyncImportService;
        this.notificationHookConsumer = notificationHookConsumer;
        this.requestQueue = requestQueue;
        this.asyncImportSemaphore = new Semaphore(1);
        this.applicationProperties = ApplicationProperties.get();
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").setUncaughtExceptionHandler((thread, throwable) -> LOG.error("Uncaught exception in thread {}: {}", new Object[]{thread.getName(), throwable.getMessage(), throwable})).build());
    }

    public void start() throws AtlasException {
        if (HAConfiguration.isHAEnabled((Configuration)this.applicationProperties)) {
            LOG.info("HA is enabled, not starting import consumers inline.");
            return;
        }
        this.startInternal();
    }

    private void startInternal() {
        CompletionStage populateTask = CompletableFuture.runAsync(this::populateRequestQueue).exceptionally(ex -> {
            LOG.error("Failed to populate request queue", ex);
            return null;
        });
        CompletionStage resumeTask = CompletableFuture.runAsync(this::resumeInProgressImports).exceptionally(ex -> {
            LOG.error("Failed to resume in-progress imports", ex);
            return null;
        });
        ((CompletableFuture)((CompletableFuture)CompletableFuture.allOf(new CompletableFuture[]{populateTask, resumeTask}).thenRun(this::startNextImportInQueue)).exceptionally(ex -> {
            LOG.error("Failed to start next import in queue", ex);
            return null;
        })).join();
    }

    public void onReceiveImportRequest(AtlasAsyncImportRequest importRequest) throws AtlasBaseException {
        try {
            LOG.info("==> onReceiveImportRequest(atlasAsyncImportRequest={})", (Object)importRequest);
            importRequest.setStatus(AtlasAsyncImportRequest.ImportStatus.WAITING);
            this.asyncImportService.updateImportRequest(importRequest);
            this.requestQueue.put(importRequest.getImportId());
            this.startNextImportInQueue();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("Failed to add import request: {} to the queue", (Object)importRequest.getImportId());
            throw new AtlasBaseException(AtlasErrorCode.IMPORT_QUEUEING_FAILED, (Throwable)e, new String[]{importRequest.getImportId()});
        }
        finally {
            LOG.info("<== onReceiveImportRequest(atlasAsyncImportRequest={})", (Object)importRequest);
        }
    }

    public void onCompleteImportRequest(String importId) {
        LOG.info("==> onCompleteImportRequest(importId={})", (Object)importId);
        try {
            this.notificationHookConsumer.closeImportConsumer(importId, AtlasConfiguration.ASYNC_IMPORT_TOPIC_PREFIX.getString() + importId);
        }
        finally {
            this.releaseAsyncImportSemaphore();
            this.startNextImportInQueue();
            LOG.info("<== onCompleteImportRequest(importId={})", (Object)importId);
        }
    }

    private void startNextImportInQueue() {
        LOG.info("==> startNextImportInQueue()");
        this.startAsyncImportIfAvailable(null);
        LOG.info("<== startNextImportInQueue()");
    }

    @VisibleForTesting
    void startAsyncImportIfAvailable(String importId) {
        LOG.info("==> startAsyncImportIfAvailable()");
        try {
            AtlasAsyncImportRequest nextImport;
            if (!this.asyncImportSemaphore.tryAcquire()) {
                LOG.info("An async import is in progress, import request is queued");
                return;
            }
            AtlasAsyncImportRequest atlasAsyncImportRequest = nextImport = importId != null ? this.asyncImportService.fetchImportRequestByImportId(importId) : this.getNextImportFromQueue();
            if (this.isNotValidImportRequest(nextImport)) {
                this.releaseAsyncImportSemaphore();
                return;
            }
            this.executorService.submit(() -> this.startImportConsumer(nextImport));
        }
        catch (Exception e) {
            LOG.error("Error while starting the next import, releasing the lock if held", (Throwable)e);
            this.releaseAsyncImportSemaphore();
        }
        finally {
            LOG.info("<== startAsyncImportIfAvailable()");
        }
    }

    private void startImportConsumer(AtlasAsyncImportRequest importRequest) {
        try {
            LOG.info("==> startImportConsumer(atlasAsyncImportRequest={})", (Object)importRequest);
            this.notificationHookConsumer.startAsyncImportConsumer(NotificationInterface.NotificationType.ASYNC_IMPORT, importRequest.getImportId(), importRequest.getTopicName());
            importRequest.setStatus(AtlasAsyncImportRequest.ImportStatus.PROCESSING);
            importRequest.setProcessingStartTime(System.currentTimeMillis());
        }
        catch (Exception e) {
            LOG.error("Failed to start consumer for import: {}, marking import as failed", (Object)importRequest, (Object)e);
            importRequest.setStatus(AtlasAsyncImportRequest.ImportStatus.FAILED);
        }
        finally {
            this.asyncImportService.updateImportRequest(importRequest);
            if (ObjectUtils.equals((Object)importRequest.getStatus(), (Object)AtlasAsyncImportRequest.ImportStatus.FAILED)) {
                this.onCompleteImportRequest(importRequest.getImportId());
            }
            LOG.info("<== startImportConsumer(atlasAsyncImportRequest={})", (Object)importRequest);
        }
    }

    @VisibleForTesting
    AtlasAsyncImportRequest getNextImportFromQueue() {
        LOG.info("==> getNextImportFromQueue()");
        int maxRetries = 5;
        int retryCount = 0;
        Object nextImport = null;
        while (retryCount < 5) {
            try {
                String importId = this.requestQueue.poll(10L, TimeUnit.SECONDS);
                if (importId == null) {
                    LOG.warn("Still waiting for import request... (attempt {} of {})", (Object)(++retryCount), (Object)5);
                    continue;
                }
                retryCount = 0;
                AtlasAsyncImportRequest importRequest = this.asyncImportService.fetchImportRequestByImportId(importId);
                if (this.isNotValidImportRequest(importRequest)) {
                    LOG.info("Import request {}, is not in a valid status to start import, hence skipping..", (Object)importRequest);
                    continue;
                }
                LOG.info("<== getImportIdFromQueue(nextImportId={})", (Object)importRequest.getImportId());
                return importRequest;
            }
            catch (InterruptedException e) {
                LOG.error("Thread interrupted while waiting for importId from the queue", (Throwable)e);
                Thread.currentThread().interrupt();
                return null;
            }
        }
        LOG.error("Exceeded max retry attempts. Exiting...");
        return null;
    }

    @VisibleForTesting
    boolean isNotValidImportRequest(AtlasAsyncImportRequest importRequest) {
        return importRequest == null || !AtlasAsyncImportRequest.ImportStatus.WAITING.equals((Object)importRequest.getStatus()) && !AtlasAsyncImportRequest.ImportStatus.PROCESSING.equals((Object)importRequest.getStatus());
    }

    private void releaseAsyncImportSemaphore() {
        LOG.info("==> releaseAsyncImportSemaphore()");
        if (this.asyncImportSemaphore.availablePermits() == 0) {
            this.asyncImportSemaphore.release();
            LOG.info("<== releaseAsyncImportSemaphore()");
        } else {
            LOG.info("<== releaseAsyncImportSemaphore(); no lock held");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void populateRequestQueue() {
        LOG.info("==> populateRequestQueue()");
        List importRequests = this.asyncImportService.fetchQueuedImportRequests();
        try {
            if (!importRequests.isEmpty()) {
                for (String request : importRequests) {
                    try {
                        if (this.requestQueue.offer(request, 5L, TimeUnit.SECONDS)) continue;
                        LOG.warn("populateRequestQueue(): Request {} could not be added to the queue", (Object)request);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        LOG.error("populateRequestQueue(): Failed to add requests to queue");
                        break;
                    }
                }
                LOG.info("populateRequestQueue(): Added {} requests to queue", (Object)importRequests.size());
            } else {
                LOG.warn("populateRequestQueue(): No queued requests found.");
            }
        }
        finally {
            LOG.info("<== populateRequestQueue()");
        }
    }

    private void resumeInProgressImports() {
        LOG.info("==> resumeInProgressImports()");
        try {
            String importId = this.asyncImportService.fetchInProgressImportIds().stream().findFirst().orElse(null);
            if (importId == null) {
                LOG.warn("No imports found to resume");
                return;
            }
            LOG.info("Resuming import id={}", (Object)importId);
            this.startAsyncImportIfAvailable(importId);
        }
        finally {
            LOG.info("<== resumeInProgressImports()");
        }
    }

    @PreDestroy
    public void stopImport() {
        LOG.info("Shutting down import processor...");
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor service did not terminate gracefully within the timeout. Waiting longer...");
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    LOG.warn("Forcing shutdown...");
                    this.executorService.shutdownNow();
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Shutdown interrupted. Forcing shutdown...");
            this.executorService.shutdownNow();
        }
        LOG.info("Import processor stopped.");
    }

    public void stop() throws AtlasException {
        try {
            this.stopImport();
        }
        finally {
            this.releaseAsyncImportSemaphore();
        }
    }

    public void instanceIsActive() {
        LOG.info("Reacting to active state: initializing Kafka consumers");
        this.startInternal();
    }

    public void instanceIsPassive() {
        try {
            this.stopImport();
        }
        finally {
            this.releaseAsyncImportSemaphore();
        }
    }

    public int getHandlerOrder() {
        return ActiveStateChangeHandler.HandlerOrder.IMPORT_TASK_LISTENER.getOrder();
    }
}

