package org.apache.atlas.repository.patches;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/repository/patches/EdgePatchProcessor.class */
public abstract class EdgePatchProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(EdgePatchProcessor.class);
    public static final int NUM_WORKERS;
    public static final int BATCH_SIZE;
    private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
    private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
    private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
    private static final String WORKER_NAME_PREFIX = "patchWorkItem";
    private static final int MAX_COMMIT_RETRY_COUNT = 3;
    private final AtlasGraph graph;
    private final GraphBackedSearchIndexer indexer;
    private final AtlasTypeRegistry typeRegistry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/repository/patches/EdgePatchProcessor$Consumer.class */
    public static class Consumer extends WorkItemConsumer<String> {
        private final AtlasGraph graph;
        private final AtlasTypeRegistry typeRegistry;
        private final AtomicLong counter;
        private final EdgePatchProcessor individualItemProcessor;

        public Consumer(AtlasGraph atlasGraph, AtlasTypeRegistry atlasTypeRegistry, BlockingQueue<String> blockingQueue, EdgePatchProcessor edgePatchProcessor) {
            super(blockingQueue);
            this.graph = atlasGraph;
            this.typeRegistry = atlasTypeRegistry;
            this.counter = new AtomicLong(0L);
            this.individualItemProcessor = edgePatchProcessor;
        }

        protected void commitDirty() {
            attemptCommit();
            EdgePatchProcessor.LOG.info("Total: Commit: {}", Long.valueOf(this.counter.get()));
            super.commitDirty();
        }

        protected void doCommit() {
            if (this.counter.get() % EdgePatchProcessor.BATCH_SIZE == 0) {
                EdgePatchProcessor.LOG.info("Processed: {}", Long.valueOf(this.counter.get()));
                attemptCommit();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processItem(String str) {
            this.counter.incrementAndGet();
            AtlasEdge edge = this.graph.getEdge(str);
            if (edge == null) {
                EdgePatchProcessor.LOG.warn("processItem(edgeId={}): AtlasEdge not found!", str);
                return;
            }
            String str2 = (String) edge.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
            AtlasRelationshipType relationshipTypeByName = this.typeRegistry.getRelationshipTypeByName(str2);
            if (relationshipTypeByName == null) {
                return;
            }
            try {
                this.individualItemProcessor.processEdgesItem(str, edge, str2, relationshipTypeByName);
                doCommit();
            } catch (AtlasBaseException e) {
                EdgePatchProcessor.LOG.error("Error processing: edgeId={}", str, e);
            }
        }

        private void attemptCommit() {
            for (int i = 1; i <= 3; i++) {
                try {
                    this.graph.commit();
                    return;
                } catch (Exception e) {
                    EdgePatchProcessor.LOG.error("Commit exception: attempt {} of {}", new Object[]{Integer.valueOf(i), 3, e});
                    try {
                        Thread.sleep(300 * i);
                    } catch (InterruptedException e2) {
                        EdgePatchProcessor.LOG.error("Commit exception: Pause: Interrupted!", e2);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/repository/patches/EdgePatchProcessor$ConsumerBuilder.class */
    public static class ConsumerBuilder implements WorkItemBuilder<Consumer, String> {
        private final AtlasTypeRegistry typeRegistry;
        private final AtlasGraph graph;
        private final EdgePatchProcessor patchItemProcessor;

        public ConsumerBuilder(AtlasGraph atlasGraph, AtlasTypeRegistry atlasTypeRegistry, EdgePatchProcessor edgePatchProcessor) {
            this.graph = atlasGraph;
            this.typeRegistry = atlasTypeRegistry;
            this.patchItemProcessor = edgePatchProcessor;
        }

        public Consumer build(BlockingQueue<String> blockingQueue) {
            return new Consumer(this.graph, this.typeRegistry, blockingQueue, this.patchItemProcessor);
        }

        /* renamed from: build, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Runnable m127build(BlockingQueue blockingQueue) {
            return build((BlockingQueue<String>) blockingQueue);
        }
    }

    public EdgePatchProcessor(PatchContext patchContext) {
        this.graph = patchContext.getGraph();
        this.indexer = patchContext.getIndexer();
        this.typeRegistry = patchContext.getTypeRegistry();
    }

    public AtlasGraph getGraph() {
        return this.graph;
    }

    public GraphBackedSearchIndexer getIndexer() {
        return this.indexer;
    }

    public AtlasTypeRegistry getTypeRegistry() {
        return this.typeRegistry;
    }

    public void apply() throws AtlasBaseException {
        prepareForExecution();
        execute();
    }

    protected abstract void prepareForExecution() throws AtlasBaseException;

    protected abstract void submitEdgesToUpdate(WorkItemManager workItemManager);

    protected abstract void processEdgesItem(String str, AtlasEdge atlasEdge, String str2, AtlasRelationshipType atlasRelationshipType) throws AtlasBaseException;

    private void execute() {
        WorkItemManager workItemManager = new WorkItemManager(new ConsumerBuilder(this.graph, this.typeRegistry, this), WORKER_NAME_PREFIX, BATCH_SIZE, NUM_WORKERS, false);
        try {
            submitEdgesToUpdate(workItemManager);
            workItemManager.drain();
        } finally {
            try {
                workItemManager.shutdown();
            } catch (InterruptedException e) {
                LOG.error("EdgePatchProcessor.execute(): interrupted during WorkItemManager shutdown.", e);
            }
        }
    }

    static {
        int i = 3;
        int i2 = 300;
        try {
            Configuration configuration = ApplicationProperties.get();
            i = configuration.getInt(NUM_WORKERS_PROPERTY, configuration.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
            i2 = configuration.getInt(BATCH_SIZE_PROPERTY, 300);
            LOG.info("EdgePatchProcessor: {}={}, {}={}", new Object[]{NUM_WORKERS_PROPERTY, Integer.valueOf(i), BATCH_SIZE_PROPERTY, Integer.valueOf(i2)});
        } catch (Exception e) {
            LOG.error("Error retrieving configuration.", e);
        }
        NUM_WORKERS = i;
        BATCH_SIZE = i2;
    }
}
