package org.apache.atlas.repository.store.graph.v2;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.ImportNotification;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.repository.impexp.AsyncImportService;
import org.apache.atlas.repository.store.graph.v2.asyncimport.ImportTaskListener;
import org.apache.commons.lang.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.class */
public class AsyncImportTaskExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncImportTaskExecutor.class);
    private static final String MESSAGE_SOURCE = AsyncImportTaskExecutor.class.getSimpleName();
    private final AsyncImportService importService;
    private final ImportTaskListener importTaskListener;
    private final NotificationInterface notificationInterface = NotificationProvider.get();
    private final MessageSource messageSource = new MessageSource(MESSAGE_SOURCE);

    @Inject
    public AsyncImportTaskExecutor(AsyncImportService asyncImportService, ImportTaskListener importTaskListener) {
        this.importService = asyncImportService;
        this.importTaskListener = importTaskListener;
    }

    public AtlasAsyncImportRequest run(AtlasImportResult atlasImportResult, EntityImportStream entityImportStream) throws AtlasBaseException {
        try {
            try {
                String md5Hash = entityImportStream.getMd5Hash();
                AtlasAsyncImportRequest registerRequest = registerRequest(atlasImportResult, md5Hash, entityImportStream.size(), entityImportStream.getCreationOrder());
                if (ObjectUtils.equals(registerRequest.getStatus(), AtlasAsyncImportRequest.ImportStatus.WAITING) || ObjectUtils.equals(registerRequest.getStatus(), AtlasAsyncImportRequest.ImportStatus.PROCESSING)) {
                    LOG.warn("AsyncImportTaskExecutor.run(): Import request with id={} is already in state={}", md5Hash, registerRequest.getStatus());
                } else {
                    if (ObjectUtils.equals(registerRequest.getStatus(), AtlasAsyncImportRequest.ImportStatus.STAGING)) {
                        skipToStartEntityPosition(registerRequest, entityImportStream);
                    }
                    publishImportRequest(registerRequest, entityImportStream);
                }
                return registerRequest;
            } catch (AtlasBaseException e) {
                throw new AtlasBaseException(AtlasErrorCode.IMPORT_FAILED, e, new String[0]);
            }
        } finally {
            entityImportStream.close();
        }
    }

    public void publishTypeDefNotification(AtlasAsyncImportRequest atlasAsyncImportRequest, AtlasTypesDef atlasTypesDef) throws AtlasBaseException {
        LOG.info("==> publishTypeDefNotification(importRequest={}, atlasTypesDef={})", atlasAsyncImportRequest, atlasTypesDef);
        try {
            sendToTopic(atlasAsyncImportRequest.getTopicName(), new ImportNotification.AtlasTypesDefImportNotification(atlasAsyncImportRequest.getImportId(), atlasAsyncImportRequest.getImportResult().getUserName(), atlasTypesDef));
            LOG.info("<== publishTypeDefNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
        } catch (Throwable th) {
            LOG.info("<== publishTypeDefNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
            throw th;
        }
    }

    public void onCompleteImportRequest(String str) {
        this.importTaskListener.onCompleteImportRequest(str);
    }

    public void abortAsyncImportRequest(String str) throws AtlasBaseException {
        LOG.info("==> abortAsyncImportRequest(importId={})", str);
        try {
            try {
                this.notificationInterface.deleteTopic(NotificationInterface.NotificationType.ASYNC_IMPORT, this.importService.abortImport(str).getTopicName());
                LOG.info("<== abortAsyncImportRequest(importId={})", str);
            } catch (AtlasBaseException e) {
                throw new AtlasBaseException(AtlasErrorCode.ABORT_IMPORT_FAILED, e, new String[]{str});
            }
        } catch (Throwable th) {
            LOG.info("<== abortAsyncImportRequest(importId={})", str);
            throw th;
        }
    }

    public void delete() {
        LOG.info("==> delete()");
        this.importService.deleteRequests();
        LOG.info("<== delete()");
    }

    @VisibleForTesting
    void publishImportRequest(AtlasAsyncImportRequest atlasAsyncImportRequest, EntityImportStream entityImportStream) throws AtlasBaseException {
        try {
            LOG.info("==> publishImportRequest(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
            publishTypeDefNotification(atlasAsyncImportRequest, entityImportStream.getTypesDef());
            publishEntityNotification(atlasAsyncImportRequest, entityImportStream);
            atlasAsyncImportRequest.setStagedTime(System.currentTimeMillis());
            this.importService.updateImportRequest(atlasAsyncImportRequest);
            this.importTaskListener.onReceiveImportRequest(atlasAsyncImportRequest);
        } finally {
            this.notificationInterface.closeProducer(NotificationInterface.NotificationType.ASYNC_IMPORT, atlasAsyncImportRequest.getTopicName());
            LOG.info("<== publishImportRequest(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
        }
    }

    @VisibleForTesting
    void publishEntityNotification(AtlasAsyncImportRequest atlasAsyncImportRequest, EntityImportStream entityImportStream) {
        LOG.info("==> publishEntityNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
        int publishedEntityCount = atlasAsyncImportRequest.getImportDetails().getPublishedEntityCount();
        int failedEntitiesCount = atlasAsyncImportRequest.getImportDetails().getFailedEntitiesCount();
        while (entityImportStream.hasNext()) {
            AtlasEntity.AtlasEntityWithExtInfo nextEntityWithExtInfo = entityImportStream.getNextEntityWithExtInfo();
            AtlasEntity entity = nextEntityWithExtInfo != null ? nextEntityWithExtInfo.getEntity() : null;
            int position = entityImportStream.getPosition();
            if (entity == null) {
                atlasAsyncImportRequest.getImportTrackingInfo().setStartEntityPosition(position);
                atlasAsyncImportRequest.getImportDetails().setPublishedEntityCount(publishedEntityCount);
                this.importService.updateImportRequest(atlasAsyncImportRequest);
                LOG.info("<== publishEntityNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
            } else {
                try {
                    try {
                        sendToTopic(atlasAsyncImportRequest.getTopicName(), new ImportNotification.AtlasEntityImportNotification(atlasAsyncImportRequest.getImportId(), atlasAsyncImportRequest.getImportResult().getUserName(), nextEntityWithExtInfo, entityImportStream.getPosition()));
                        entityImportStream.onImportComplete(entity.getGuid());
                        publishedEntityCount++;
                        atlasAsyncImportRequest.getImportTrackingInfo().setStartEntityPosition(position);
                        atlasAsyncImportRequest.getImportDetails().setPublishedEntityCount(publishedEntityCount);
                        this.importService.updateImportRequest(atlasAsyncImportRequest);
                        LOG.info("<== publishEntityNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
                    } catch (AtlasBaseException e) {
                        failedEntitiesCount++;
                        LOG.warn("AsyncImport(id={}): failed to publish entity guid={}", new Object[]{atlasAsyncImportRequest.getImportId(), entity.getGuid(), e});
                        atlasAsyncImportRequest.getImportDetails().getFailedEntities().add(entity.getGuid());
                        atlasAsyncImportRequest.getImportDetails().setFailedEntitiesCount(failedEntitiesCount);
                        atlasAsyncImportRequest.getImportDetails().getFailures().put(entity.getGuid(), e.getMessage());
                        atlasAsyncImportRequest.getImportTrackingInfo().setStartEntityPosition(position);
                        atlasAsyncImportRequest.getImportDetails().setPublishedEntityCount(publishedEntityCount);
                        this.importService.updateImportRequest(atlasAsyncImportRequest);
                        LOG.info("<== publishEntityNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
                    }
                } catch (Throwable th) {
                    atlasAsyncImportRequest.getImportTrackingInfo().setStartEntityPosition(position);
                    atlasAsyncImportRequest.getImportDetails().setPublishedEntityCount(publishedEntityCount);
                    this.importService.updateImportRequest(atlasAsyncImportRequest);
                    LOG.info("<== publishEntityNotification(atlasAsyncImportRequest={})", atlasAsyncImportRequest);
                    throw th;
                }
            }
        }
    }

    @VisibleForTesting
    void skipToStartEntityPosition(AtlasAsyncImportRequest atlasAsyncImportRequest, EntityImportStream entityImportStream) {
        int startEntityPosition = atlasAsyncImportRequest.getImportTrackingInfo().getStartEntityPosition();
        LOG.info("==> skipToStartEntityPosition(atlasAsyncImportRequest={}): position={}", atlasAsyncImportRequest, Integer.valueOf(startEntityPosition));
        while (entityImportStream.hasNext() && startEntityPosition > entityImportStream.getPosition()) {
            entityImportStream.next();
        }
        LOG.info("<== skipToStartEntityPosition(atlasAsyncImportRequest={}): position={}", atlasAsyncImportRequest, Integer.valueOf(startEntityPosition));
    }

    @VisibleForTesting
    AtlasAsyncImportRequest registerRequest(AtlasImportResult atlasImportResult, String str, int i, List<String> list) throws AtlasBaseException {
        LOG.info("==> registerRequest(importId={})", str);
        try {
            try {
                AtlasAsyncImportRequest fetchImportRequestByImportId = this.importService.fetchImportRequestByImportId(str);
                if (fetchImportRequestByImportId != null && !ObjectUtils.equals(fetchImportRequestByImportId.getStatus(), AtlasAsyncImportRequest.ImportStatus.SUCCESSFUL) && !ObjectUtils.equals(fetchImportRequestByImportId.getStatus(), AtlasAsyncImportRequest.ImportStatus.PARTIAL_SUCCESS) && !ObjectUtils.equals(fetchImportRequestByImportId.getStatus(), AtlasAsyncImportRequest.ImportStatus.FAILED) && !ObjectUtils.equals(fetchImportRequestByImportId.getStatus(), AtlasAsyncImportRequest.ImportStatus.ABORTED)) {
                    if (ObjectUtils.equals(fetchImportRequestByImportId.getStatus(), AtlasAsyncImportRequest.ImportStatus.STAGING)) {
                        fetchImportRequestByImportId.setReceivedTime(System.currentTimeMillis());
                        this.importService.updateImportRequest(fetchImportRequestByImportId);
                    }
                    LOG.info("registerRequest(importId={}): not a new request, resuming {}", str, fetchImportRequestByImportId);
                    LOG.info("<== registerRequest(importId={})", str);
                    return fetchImportRequestByImportId;
                }
                AtlasAsyncImportRequest atlasAsyncImportRequest = new AtlasAsyncImportRequest(atlasImportResult);
                atlasAsyncImportRequest.setImportId(str);
                atlasAsyncImportRequest.setReceivedTime(System.currentTimeMillis());
                atlasAsyncImportRequest.getImportDetails().setTotalEntitiesCount(i);
                atlasAsyncImportRequest.getImportDetails().setCreationOrder(list);
                this.importService.saveImportRequest(atlasAsyncImportRequest);
                LOG.info("registerRequest(importId={}): registered new request {}", str, atlasAsyncImportRequest);
                LOG.info("<== registerRequest(importId={})", str);
                return atlasAsyncImportRequest;
            } catch (AtlasBaseException e) {
                LOG.error("Failed to register import request id={}", str, e);
                throw new AtlasBaseException(AtlasErrorCode.IMPORT_REGISTRATION_FAILED, e, new String[0]);
            }
        } catch (Throwable th) {
            LOG.info("<== registerRequest(importId={})", str);
            throw th;
        }
    }

    private void sendToTopic(String str, HookNotification hookNotification) throws AtlasBaseException {
        try {
            this.notificationInterface.send(str, Collections.singletonList(hookNotification), this.messageSource);
        } catch (NotificationException e) {
            throw new AtlasBaseException(e);
        }
    }
}
