package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;

import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.StatusReporter;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.class */
public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
    private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
    private static final String WORKER_PREFIX = "migration-import";
    private static final long STATUS_REPORT_TIMEOUT_DURATION = 60000;
    private final StatusReporter<String, Long> statusReporter;
    private final AtlasImportResult importResult;
    private final DataMigrationStatusService dataMigrationStatusService;
    private String currentTypeName;
    private float currentPercent;
    private EntityImportStream entityImportStream;

    public EntityCreationManager(WorkItemBuilder workItemBuilder, int i, int i2, AtlasImportResult atlasImportResult, DataMigrationStatusService dataMigrationStatusService) {
        super(workItemBuilder, WORKER_PREFIX, i, i2, true);
        this.importResult = atlasImportResult;
        this.dataMigrationStatusService = dataMigrationStatusService;
        this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
    }

    public long read(EntityImportStream entityImportStream) {
        long position = entityImportStream.getPosition();
        this.entityImportStream = entityImportStream;
        this.dataMigrationStatusService.setStatus("IN_PROGRESS");
        while (true) {
            AtlasEntity.AtlasEntityWithExtInfo nextEntityWithExtInfo = entityImportStream.getNextEntityWithExtInfo();
            if (nextEntityWithExtInfo == null) {
                break;
            }
            AtlasEntity entity = nextEntityWithExtInfo != null ? nextEntityWithExtInfo.getEntity() : null;
            if (entity != null) {
                try {
                    long j = position;
                    position = j + 1;
                    produce(j, entity.getTypeName(), nextEntityWithExtInfo);
                } catch (Throwable th) {
                    LOG.warn("Exception: {}", entity.getGuid(), th);
                }
            }
        }
        this.dataMigrationStatusService.setStatus("DONE");
        return position;
    }

    public void extractResults() {
        while (true) {
            Object poll = getResults().poll();
            if (poll == null) {
                logStatus();
                return;
            }
            this.statusReporter.processed((String) poll);
        }
    }

    private void produce(long j, String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        String currentTypeName = getCurrentTypeName();
        if (StringUtils.isNotEmpty(str) && StringUtils.isNotEmpty(currentTypeName) && !StringUtils.equals(currentTypeName, str)) {
            LOG.info("Waiting: '{}' to complete...", currentTypeName);
            super.drain();
            LOG.info("Switching entity type processing: From: '{}' To: '{}'...", currentTypeName, str);
        }
        setCurrentTypeName(str);
        this.statusReporter.produced(atlasEntityWithExtInfo.getEntity().getGuid(), Long.valueOf(j));
        super.checkProduce(atlasEntityWithExtInfo);
        extractResults();
    }

    private void logStatus() {
        Long l = (Long) this.statusReporter.ack();
        if (l == null) {
            return;
        }
        this.importResult.incrementMeticsCounter(getCurrentTypeName());
        this.dataMigrationStatusService.savePosition(l);
        this.currentPercent = updateImportMetrics(getCurrentTypeName(), l.longValue(), this.entityImportStream.size(), getCurrentPercent());
    }

    private static float updateImportMetrics(String str, long j, int i, float f) {
        return BulkImporterImpl.updateImportProgress(LOG, (int) j, i, f, String.format("entity:last-imported:%s:(%s)", str, Long.valueOf(j)));
    }

    private String getCurrentTypeName() {
        return this.currentTypeName;
    }

    private void setCurrentTypeName(String str) {
        this.currentTypeName = str;
    }

    private float getCurrentPercent() {
        return this.currentPercent;
    }
}
