package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.class */
public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
    private static final int MAX_COMMIT_RETRY_COUNT = 3;
    private final int batchSize;
    private final AtlasEntityStore entityStore;
    private final AtlasGraph atlasGraphBulk;
    private final AtlasEntityStore entityStoreBulk;
    private final AtlasTypeRegistry typeRegistry;
    private final EntityGraphRetriever entityRetrieverBulk;
    private final boolean isMigrationImport;
    private final AtomicLong counter;
    private final AtomicLong currentBatch;
    private final AtlasGraph atlasGraph;
    private final List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer;
    private final List<String> localResults;

    public EntityConsumer(AtlasTypeRegistry atlasTypeRegistry, AtlasGraph atlasGraph, AtlasEntityStore atlasEntityStore, AtlasGraph atlasGraph2, AtlasEntityStore atlasEntityStore2, EntityGraphRetriever entityGraphRetriever, BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> blockingQueue, int i, boolean z) {
        super(blockingQueue);
        this.counter = new AtomicLong(1L);
        this.currentBatch = new AtomicLong(1L);
        this.entityBuffer = new ArrayList();
        this.localResults = new ArrayList();
        this.typeRegistry = atlasTypeRegistry;
        this.atlasGraph = atlasGraph;
        this.entityStore = atlasEntityStore;
        this.atlasGraphBulk = atlasGraph2;
        this.entityStoreBulk = atlasEntityStore2;
        this.entityRetrieverBulk = entityGraphRetriever;
        this.batchSize = i;
        this.isMigrationImport = z;
    }

    protected void commitDirty() {
        super.commitDirty();
        LOG.info("Total: Commit: {}", Long.valueOf(this.counter.get()));
        this.counter.set(0L);
    }

    protected void doCommit() {
        for (int i = 1; i <= 3; i++) {
            if (commitWithRetry(i)) {
                return;
            }
        }
        LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", Integer.valueOf(this.entityBuffer.size()), Long.valueOf(this.counter.get()));
        clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processItem(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        int size = MapUtils.isEmpty(atlasEntityWithExtInfo.getReferredEntities()) ? 1 : atlasEntityWithExtInfo.getReferredEntities().size() + 1;
        long addAndGet = this.counter.addAndGet(size);
        this.currentBatch.addAndGet(size);
        try {
            processEntity(atlasEntityWithExtInfo, addAndGet);
            attemptCommit();
        } catch (Exception e) {
            LOG.info("Invalid entities. Possible data loss: Please correct and re-submit!", e);
        }
    }

    private void processEntity(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, long j) {
        RequestContext.get().setImportInProgress(true);
        RequestContext.get().setCreateShellEntityForNonExistingReference(true);
        RequestContext.get().setMigrationInProgress(this.isMigrationImport);
        try {
            LOG.debug("Processing: {}", Long.valueOf(j));
            importUsingBulkEntityStore(atlasEntityWithExtInfo);
        } catch (AtlasSchemaViolationException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Entity: {}", atlasEntityWithExtInfo.getEntity().getGuid(), e);
            }
            BulkImporterImpl.updateVertexGuid(this.atlasGraphBulk, this.typeRegistry, this.entityRetrieverBulk, atlasEntityWithExtInfo.getEntity());
        } catch (IllegalArgumentException | IllegalStateException e2) {
            LOG.warn("{}: {} - {}", new Object[]{e2.getClass().getSimpleName(), atlasEntityWithExtInfo.getEntity().getTypeName(), atlasEntityWithExtInfo.getEntity().getGuid(), e2});
            importUsingRegularEntityStore(atlasEntityWithExtInfo, e2);
        } catch (AtlasBaseException e3) {
            LOG.warn("AtlasBaseException: {} - {}", new Object[]{atlasEntityWithExtInfo.getEntity().getTypeName(), atlasEntityWithExtInfo.getEntity().getGuid(), e3});
        }
    }

    private void importUsingBulkEntityStore(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws AtlasBaseException {
        this.entityStoreBulk.createOrUpdateForImportNoCommit(new AtlasEntityStreamForImport(atlasEntityWithExtInfo, null));
        this.localResults.add(atlasEntityWithExtInfo.getEntity().getGuid());
        this.entityBuffer.add(atlasEntityWithExtInfo);
    }

    private void importUsingRegularEntityStore(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Exception exc) {
        commitValidatedEntities(exc);
        performRegularImport(atlasEntityWithExtInfo);
    }

    /* JADX WARN: Finally extract failed */
    private void performRegularImport(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        synchronized (this.atlasGraph) {
            try {
                try {
                    LOG.info("Regular: EntityStore: {}: Starting...", Long.valueOf(this.counter.get()));
                    this.entityStore.createOrUpdateForImportNoCommit(new AtlasEntityStreamForImport(atlasEntityWithExtInfo, null));
                    this.atlasGraph.commit();
                    this.localResults.add(atlasEntityWithExtInfo.getEntity().getGuid());
                    dispatchResults();
                    LOG.info("Regular: EntityStore: {}: Commit: Done!", Long.valueOf(this.counter.get()));
                    this.atlasGraph.commit();
                    addResult(atlasEntityWithExtInfo.getEntity().getGuid());
                    clear();
                    LOG.info("Regular: EntityStore: {}: Done!", Long.valueOf(this.counter.get()));
                } catch (Exception e) {
                    this.atlasGraph.rollback();
                    LOG.error("Regular: EntityStore: Rollback!: Entity creation using regular (non-bulk) failed! Please correct entity and re-submit!", e);
                    LOG.info("Regular: EntityStore: {}: Commit: Done!", Long.valueOf(this.counter.get()));
                    this.atlasGraph.commit();
                    addResult(atlasEntityWithExtInfo.getEntity().getGuid());
                    clear();
                    LOG.info("Regular: EntityStore: {}: Done!", Long.valueOf(this.counter.get()));
                }
            } catch (Throwable th) {
                LOG.info("Regular: EntityStore: {}: Commit: Done!", Long.valueOf(this.counter.get()));
                this.atlasGraph.commit();
                addResult(atlasEntityWithExtInfo.getEntity().getGuid());
                clear();
                LOG.info("Regular: EntityStore: {}: Done!", Long.valueOf(this.counter.get()));
                throw th;
            }
        }
    }

    private void commitValidatedEntities(Exception exc) {
        try {
            LOG.info("Validated Entities: Commit: Starting...");
            rollbackPauseRetry(1, exc);
            doCommit();
            LOG.info("Validated Entities: Commit: Done!");
        } catch (Throwable th) {
            LOG.info("Validated Entities: Commit: Done!");
            throw th;
        }
    }

    private void attemptCommit() {
        if (this.currentBatch.get() < this.batchSize) {
            return;
        }
        doCommit();
    }

    private boolean commitWithRetry(int i) {
        AtlasPerfMetrics.MetricRecorder startMetricRecord = RequestContext.get().startMetricRecord("commitWithRetry");
        try {
            try {
                this.atlasGraphBulk.commit();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", new Object[]{Integer.valueOf(this.entityBuffer.size()), Long.valueOf(this.currentBatch.get()), Long.valueOf(this.counter.get())});
                }
                dispatchResults();
                RequestContext.get().endMetricRecord(startMetricRecord);
                return true;
            } catch (Exception e) {
                rollbackPauseRetry(i, e);
                RequestContext.get().endMetricRecord(startMetricRecord);
                return false;
            }
        } catch (Throwable th) {
            RequestContext.get().endMetricRecord(startMetricRecord);
            throw th;
        }
    }

    private void rollbackPauseRetry(int i, Exception exc) {
        bulkGraphRollback(i);
        LOG.warn("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", new Object[]{Integer.valueOf(this.entityBuffer.size()), Long.valueOf(this.counter.get()), Integer.valueOf(i)});
        pause(i);
        String simpleName = exc.getClass().getSimpleName();
        if (!simpleName.equals("JanusGraphException") && !simpleName.equals("PermanentLockingException")) {
            LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", new Object[]{Integer.valueOf(this.entityBuffer.size()), Long.valueOf(this.counter.get()), Integer.valueOf(i), exc});
        }
        retryProcessEntity(i);
    }

    private void bulkGraphRollback(int i) {
        try {
            this.atlasGraphBulk.rollback();
            clearCache();
        } catch (Exception e) {
            LOG.error("Rollback: Exception! Buffer: {}: Counter: {}: Retry count: {}", new Object[]{Integer.valueOf(this.entityBuffer.size()), Long.valueOf(this.counter.get()), Integer.valueOf(i)});
        }
    }

    private void retryProcessEntity(int i) {
        if (LOG.isDebugEnabled() || i > 1) {
            LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", Integer.valueOf(this.entityBuffer.size()), Integer.valueOf(i));
        }
        ArrayList arrayList = new ArrayList(this.entityBuffer);
        this.entityBuffer.clear();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            processEntity((AtlasEntity.AtlasEntityWithExtInfo) it.next(), this.counter.get());
        }
        LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", Integer.valueOf(this.entityBuffer.size()), Integer.valueOf(i));
    }

    private void dispatchResults() {
        this.localResults.forEach((v1) -> {
            addResult(v1);
        });
        clear();
    }

    private void pause(int i) {
        try {
            Thread.sleep(1000 * i);
        } catch (InterruptedException e) {
            LOG.error("pause: Interrupted!", e);
        }
    }

    private void clear() {
        this.localResults.clear();
        this.entityBuffer.clear();
        clearCache();
        this.currentBatch.set(0L);
    }

    private void clearCache() {
        GraphTransactionInterceptor.clearCache();
        RequestContext.get().clearCache();
    }
}
