/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.repository.graphdb.janus.migration;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.JsonNodeParsers;
import org.apache.atlas.repository.graphdb.janus.migration.MappedElementCache;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonNodeProcessManager {
    public static WorkItemManager create(Graph rGraph, Graph bGraph, JsonNodeParsers.ParseElement parseElement, int numWorkers, int batchSize, boolean isResuming) {
        ConsumerBuilder cb = new ConsumerBuilder(rGraph, bGraph, parseElement, batchSize, isResuming);
        return new WorkItemManager(cb, batchSize, numWorkers);
    }

    private static class ConsumerBuilder
    implements WorkItemBuilder<Consumer, JsonNode> {
        private final Graph graph;
        private final Graph bulkLoadGraph;
        private final JsonNodeParsers.ParseElement parseElement;
        private final int batchSize;
        private final boolean isResuming;

        public ConsumerBuilder(Graph graph, Graph bulkLoadGraph, JsonNodeParsers.ParseElement parseElement, int batchSize, boolean isResuming) {
            this.graph = graph;
            this.bulkLoadGraph = bulkLoadGraph;
            this.batchSize = batchSize;
            this.parseElement = parseElement;
            this.isResuming = isResuming;
        }

        public Consumer build(BlockingQueue<JsonNode> queue) {
            return this.isResuming ? new ResumingConsumer(queue, this.graph, this.bulkLoadGraph, this.parseElement, this.batchSize) : new Consumer(queue, this.graph, this.bulkLoadGraph, this.parseElement, this.batchSize);
        }
    }

    static class WorkItemManager
    extends org.apache.atlas.pc.WorkItemManager {
        public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
            super(builder, batchSize, numWorkers);
        }
    }

    private static class ResumingConsumer
    extends Consumer {
        public ResumingConsumer(BlockingQueue<JsonNode> workQueue, Graph graph, Graph bulkLoadGraph, JsonNodeParsers.ParseElement parseElement, long batchSize) {
            super(workQueue, graph, bulkLoadGraph, parseElement, batchSize);
        }

        @Override
        public void processItem(JsonNode node) {
            if (!this.contains(node)) {
                super.processItem(node);
            }
        }

        private boolean contains(JsonNode node) {
            return this.parseElement.getByOriginalId(this.bulkLoadGraph, node) != null;
        }
    }

    private static class Consumer
    extends WorkItemConsumer<JsonNode> {
        private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
        private static final int WAIT_DURATION_AFTER_COMMIT_EXCEPTION = 1000;
        private final Graph graph;
        protected final Graph bulkLoadGraph;
        protected final JsonNodeParsers.ParseElement parseElement;
        private final long batchSize;
        private AtomicLong counter;
        private final MappedElementCache cache;
        private static ThreadLocal<List<JsonNode>> nodes = ThreadLocal.withInitial(() -> new ArrayList());

        public Consumer(BlockingQueue<JsonNode> workQueue, Graph graph, Graph bulkLoadGraph, JsonNodeParsers.ParseElement parseElement, long batchSize) {
            super(workQueue);
            this.graph = graph;
            this.bulkLoadGraph = bulkLoadGraph;
            this.parseElement = parseElement;
            this.batchSize = batchSize;
            this.counter = new AtomicLong(0L);
            this.cache = new MappedElementCache();
        }

        public void processItem(JsonNode node) {
            try {
                Map<String, Object> result = this.parseElement.parse(this.bulkLoadGraph, this.cache, node);
                if (result == null) {
                    this.addNode(node);
                    this.commitConditionally(this.counter.getAndIncrement());
                } else {
                    this.commitBulk();
                    this.cache.clearAll();
                    this.updateSchema(result, node);
                }
            }
            catch (Exception ex) {
                this.bulkLoadGraph.tx().rollback();
                this.error("Failed! Retrying...", ex);
                this.retryBatchCommit();
            }
        }

        private void addNode(JsonNode node) {
            nodes.get().add(node);
        }

        protected void commitDirty() {
            super.commitDirty();
            this.cache.clearAll();
        }

        protected void doCommit() {
            this.commitBulk();
        }

        private void commitConditionally(long index) {
            if (index % this.batchSize == 0L && nodes.get().size() > 0) {
                this.commitBulk();
            }
        }

        private void commitBulk() {
            this.commit(this.bulkLoadGraph, nodes.get().size());
            nodes.get().clear();
        }

        private void commitRegular() {
            this.commit(this.graph, nodes.get().size());
            this.cache.clearAll();
        }

        private void commit(Graph g, int size) {
            this.parseElement.commit(g);
            this.display("commit-size: {}: Done!", size);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void updateSchema(Map<String, Object> schema, JsonNode node) {
            Graph graph = this.graph;
            synchronized (graph) {
                String typeName = this.parseElement.getType(node);
                try {
                    this.display("updateSchema: type: {}: ...", typeName);
                    if (schema.containsKey("oid")) {
                        this.parseElement.parse(this.graph, this.cache, node);
                    } else {
                        Object id = schema.get("id");
                        schema.remove("id");
                        this.parseElement.update(this.graph, id, schema);
                    }
                    this.commitRegular();
                    this.display("updateSchema: type: {}: Done!", typeName);
                }
                catch (NoSuchElementException ex) {
                    this.parseElement.parse(this.graph, this.cache, node);
                    this.commitRegular();
                    this.display("updateSchema: NoSuchElementException processed!: type: {}: Done!", typeName);
                }
                catch (Exception ex) {
                    this.graph.tx().rollback();
                    this.error("updateSchema: failed!: type: " + typeName, ex);
                }
            }
        }

        private void retryBatchCommit() {
            this.display("Waiting with [{} nodes] for 1 secs.", nodes.get().size());
            try {
                Thread.sleep(1000L);
                for (JsonNode n : nodes.get()) {
                    this.parseElement.parse(this.bulkLoadGraph, this.cache, n);
                }
                this.commitBulk();
                this.display("Done!: After re-adding {}.", nodes.get().size());
            }
            catch (Exception ex) {
                this.error("retryBatchCommit: Failed! Potential data loss.", ex);
            }
        }

        private void display(String message, Object s1, Object s2) {
            LOG.info("{}: [{}]: " + message, new Object[]{this.parseElement.getMessage(), this.counter, s1, s2});
        }

        private void display(String message, Object s1) {
            this.display(message, s1, "");
        }

        private void error(String message, Exception ex) {
            LOG.error("{}: [{}]: " + message, new Object[]{this.parseElement.getMessage(), this.counter, ex});
        }
    }
}

