/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.repository.patches;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.patches.AtlasPatchHandler;
import org.apache.atlas.repository.patches.ConcurrentPatchProcessor;
import org.apache.atlas.repository.patches.PatchContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReIndexPatch
extends AtlasPatchHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ReIndexPatch.class);
    private static final String PATCH_ID = "JAVA_PATCH_0000_006";
    private static final String PATCH_DESCRIPTION = "Performs reindex on all the indexes.";
    private final PatchContext context;

    public ReIndexPatch(PatchContext context) {
        super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
        this.context = context;
    }

    @Override
    public void apply() throws AtlasBaseException {
        if (!AtlasConfiguration.REBUILD_INDEX.getBoolean()) {
            LOG.info("ReIndexPatch: Skipped, since not enabled!");
            return;
        }
        try {
            LOG.info("ReIndexPatch: Starting...");
            ReindexPatchProcessor reindexPatchProcessor = new ReindexPatchProcessor(this.context);
            reindexPatchProcessor.repairVertices();
            reindexPatchProcessor.repairEdges();
        }
        catch (Exception exception) {
            LOG.error("Error while reindexing.", (Throwable)exception);
        }
        finally {
            LOG.info("ReIndexPatch: Done!");
        }
        this.setStatus(AtlasPatch.PatchStatus.UNKNOWN);
        LOG.info("ReIndexPatch.apply(): patchId={}, status={}", (Object)this.getPatchId(), (Object)this.getStatus());
    }

    public static class ReindexPatchProcessor {
        private static String[] vertexIndexNames = new String[]{"vertex_index", "fulltext_index"};
        private static String[] edgeIndexNames = new String[]{"edge_index"};
        private static String WORKER_PREFIX = "reindex";
        private PatchContext context;

        public ReindexPatchProcessor(PatchContext context) {
            this.context = context;
        }

        public void repairVertices() {
            this.repairElements(ReindexPatchProcessor::vertices, vertexIndexNames);
        }

        public void repairEdges() {
            this.repairElements(ReindexPatchProcessor::edges, edgeIndexNames);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void repairElements(BiConsumer<WorkItemManager, AtlasGraph> action, String[] indexNames) {
            WorkItemManager manager = new WorkItemManager((WorkItemBuilder)new ReindexConsumerBuilder(this.context.getGraph(), indexNames), WORKER_PREFIX, ConcurrentPatchProcessor.BATCH_SIZE, ConcurrentPatchProcessor.NUM_WORKERS, false);
            try {
                LOG.info("repairElements.execute(): {}: Starting...", (Object[])indexNames);
                action.accept(manager, this.context.getGraph());
                manager.drain();
            }
            finally {
                try {
                    manager.shutdown();
                }
                catch (InterruptedException e) {
                    LOG.error("repairEdges.execute(): interrupted during WorkItemManager shutdown.", (Throwable)e);
                }
                LOG.info("repairElements.execute(): {}: Done!", (Object[])indexNames);
            }
        }

        private static void edges(WorkItemManager manager, AtlasGraph graph) {
            Iterable iterable = graph.getEdges();
            Iterator iter = iterable.iterator();
            while (iter.hasNext()) {
                manager.checkProduce(iter.next());
            }
        }

        private static void vertices(WorkItemManager manager, AtlasGraph graph) {
            Iterable iterable = graph.getVertices();
            for (AtlasVertex vertex : iterable) {
                manager.checkProduce((Object)vertex);
            }
        }
    }

    private static class ReindexConsumer
    extends WorkItemConsumer<AtlasElement> {
        private final List<AtlasElement> list = new ArrayList<AtlasElement>();
        private final String[] indexNames;
        private final AtlasGraph graph;
        private final AtomicLong counter;

        public ReindexConsumer(BlockingQueue queue, AtlasGraph graph, String[] indexNames) {
            super(queue);
            this.graph = graph;
            this.indexNames = indexNames;
            this.counter = new AtomicLong(0L);
        }

        protected void doCommit() {
            if (this.list.size() >= ConcurrentPatchProcessor.BATCH_SIZE) {
                this.attemptCommit();
            }
        }

        protected void commitDirty() {
            this.attemptCommit();
            LOG.info("Total: Commit: {}", (Object)this.counter.get());
            super.commitDirty();
        }

        private void attemptCommit() {
            for (String indexName : this.indexNames) {
                try {
                    this.graph.getManagementSystem().reindex(indexName, this.list);
                }
                catch (IllegalStateException e) {
                    LOG.error("IllegalStateException: Exception", (Throwable)e);
                    return;
                }
                catch (Exception exception) {
                    LOG.error("Exception: {}", (Object)indexName, (Object)exception);
                }
            }
            this.list.clear();
            LOG.info("Processed: {}", (Object)this.counter.get());
        }

        protected void processItem(AtlasElement item) {
            this.counter.incrementAndGet();
            this.list.add(item);
            this.commit();
        }
    }

    private static class ReindexConsumerBuilder
    implements WorkItemBuilder<ReindexConsumer, AtlasElement> {
        private AtlasGraph graph;
        private String[] indexNames;

        public ReindexConsumerBuilder(AtlasGraph graph, String[] indexNames) {
            this.graph = graph;
            this.indexNames = indexNames;
        }

        public ReindexConsumer build(BlockingQueue queue) {
            return new ReindexConsumer(queue, this.graph, this.indexNames);
        }
    }
}

