/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.atlas.connector;

import com.couchbase.atlas.connector.AtlasConfig;
import com.couchbase.atlas.connector.CBConfig;
import com.couchbase.atlas.connector.entities.CouchbaseBucket;
import com.couchbase.atlas.connector.entities.CouchbaseCluster;
import com.couchbase.atlas.connector.entities.CouchbaseCollection;
import com.couchbase.atlas.connector.entities.CouchbaseField;
import com.couchbase.atlas.connector.entities.CouchbaseFieldType;
import com.couchbase.atlas.connector.entities.CouchbaseScope;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.highlevel.internal.CollectionIdAndKey;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.java.json.JsonObject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchbaseHook
extends AtlasHook
implements ControlEventHandler,
DataEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CouchbaseHook.class);
    protected static CouchbaseHook INSTANCE;
    protected static Client DCP;
    protected static AtlasClientV2 ATLAS;
    private static Consumer<List<AtlasEntity>> createInterceptor;
    private static Consumer<List<AtlasEntity>> updateInterceptor;
    private static boolean loop;
    private CouchbaseCluster clusterEntity;
    private CouchbaseBucket bucketEntity;

    public static void main(String[] args) {
        DCP = CBConfig.dcpClient();
        ATLAS = AtlasConfig.client();
        INSTANCE = new CouchbaseHook();
        DCP.controlEventHandler((ControlEventHandler)INSTANCE);
        DCP.dataEventHandler((DataEventHandler)INSTANCE);
        DCP.connect().block();
        LOG.info("DCP client connected.");
        INSTANCE.initializeAtlasContext();
        DCP.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
        System.out.println("Starting the stream...");
        DCP.startStreaming().block();
        System.out.println("Started the stream.");
        try {
            while (loop) {
                Thread.sleep(1000L);
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            DCP.disconnect().block();
        }
    }

    private void initializeAtlasContext() {
        LOG.debug("Creating cluster/bucket/scope entities");
        this.clusterEntity = (CouchbaseCluster)((CouchbaseCluster)new CouchbaseCluster().name(CBConfig.address())).url(CBConfig.address()).get();
        this.bucketEntity = (CouchbaseBucket)((CouchbaseBucket)new CouchbaseBucket().name(CBConfig.bucket())).cluster(this.clusterEntity).get();
        ArrayList<AtlasEntity> entitiesToCreate = new ArrayList<AtlasEntity>();
        if (!this.clusterEntity.exists(ATLAS)) {
            entitiesToCreate.add(this.clusterEntity.atlasEntity(ATLAS));
        }
        if (!this.bucketEntity.exists(ATLAS)) {
            entitiesToCreate.add(this.bucketEntity.atlasEntity(ATLAS));
        }
        if (!entitiesToCreate.isEmpty()) {
            this.createEntities(entitiesToCreate);
        }
    }

    private void createEntities(List<AtlasEntity> entities) {
        if (createInterceptor != null) {
            createInterceptor.accept(entities);
            return;
        }
        AtlasEntity.AtlasEntitiesWithExtInfo entity = new AtlasEntity.AtlasEntitiesWithExtInfo(entities);
        HookNotification.EntityCreateRequestV2 request = new HookNotification.EntityCreateRequestV2("couchbase", entity);
        this.notifyEntities(Arrays.asList(request), null);
    }

    private void updateEntities(List<AtlasEntity> entities) {
        if (updateInterceptor != null) {
            updateInterceptor.accept(entities);
            return;
        }
        AtlasEntity.AtlasEntitiesWithExtInfo entity = new AtlasEntity.AtlasEntitiesWithExtInfo(entities);
        HookNotification.EntityUpdateRequestV2 request = new HookNotification.EntityUpdateRequestV2("couchbase", entity);
        this.notifyEntities(Arrays.asList(request), null);
    }

    public void onEvent(ChannelFlowController flowController, ByteBuf event) {
        if (Math.random() > (double)CBConfig.dcpSampleRatio().floatValue()) {
            LOG.debug("Skipping DCP message.");
            return;
        }
        if (DcpMutationMessage.is((ByteBuf)event)) {
            try {
                CollectionIdAndKey ckey = MessageUtil.getCollectionIdAndKey((ByteBuf)event, (boolean)true);
                CollectionsManifest.CollectionInfo collectionInfo = CouchbaseHook.collectionInfo(MessageUtil.getVbucket((ByteBuf)event), ckey.collectionId());
                String collectionName = collectionInfo.name();
                String scopeName = collectionInfo.scope().name();
                LOG.debug("Received DCP mutation message for scope '{}' and collection '{}'", (Object)scopeName, (Object)collectionName);
                CouchbaseScope scopeEntity = this.bucketEntity.scope(scopeName);
                ArrayList<AtlasEntity> toCreate = new ArrayList<AtlasEntity>();
                ArrayList<AtlasEntity> toUpdate = new ArrayList<AtlasEntity>();
                if (!scopeEntity.exists(ATLAS)) {
                    toCreate.add(scopeEntity.atlasEntity(ATLAS));
                    LOG.debug("Creating scope: {}", (Object)scopeEntity.qualifiedName());
                } else {
                    toUpdate.add(scopeEntity.atlasEntity(ATLAS));
                    LOG.debug("Updating scope: {}", (Object)scopeEntity.qualifiedName());
                }
                CouchbaseCollection collectionEntity = scopeEntity.collection(collectionName);
                collectionEntity.incrementAnalyzedDocuments();
                if (!collectionEntity.exists(ATLAS)) {
                    toCreate.add(collectionEntity.atlasEntity(ATLAS));
                } else {
                    toUpdate.add(collectionEntity.atlasEntity(ATLAS));
                }
                Map document = JsonObject.fromJson((byte[])DcpMutationMessage.contentBytes((ByteBuf)event)).toMap();
                System.out.println(String.format("Document keys: %s", document.keySet()));
                document.entrySet().stream().filter(e -> e.getValue() != null).flatMap(entry -> CouchbaseHook.processField(collectionEntity, Collections.EMPTY_LIST, null, (String)entry.getKey(), entry.getValue())).peek(CouchbaseField::incrementDocumentCount).filter(field -> field.exists(ATLAS) || (float)field.documentCount() / (float)collectionEntity.documentsAnalyzed() > (float)CBConfig.dcpFieldThreshold().shortValue()).forEach(field -> {
                    if (field.exists(ATLAS)) {
                        toUpdate.add(field.atlasEntity(ATLAS));
                    } else {
                        toCreate.add(field.atlasEntity(ATLAS));
                    }
                });
                this.createEntities(toCreate);
                this.updateEntities(toUpdate);
                System.out.println("Notified Atlas");
            }
            catch (Exception e2) {
                LOG.error("Failed to process DCP message", (Throwable)e2);
            }
        }
    }

    private static Stream<CouchbaseField> processField(CouchbaseCollection collectionEntity, Collection<String> path, @Nullable CouchbaseField parent, String name, Object value) {
        CouchbaseFieldType fieldType = CouchbaseFieldType.infer(value);
        ArrayList<String> fieldPath = new ArrayList<String>(path);
        fieldPath.add(name);
        CouchbaseField rootField = (CouchbaseField)((CouchbaseField)new CouchbaseField().name(name)).fieldPath(fieldPath.stream().collect(Collectors.joining("."))).fieldType(fieldType).collection(collectionEntity).parentField(parent).get();
        Stream<CouchbaseField> result = Stream.of(rootField);
        if (fieldType == CouchbaseFieldType.OBJECT) {
            if (value instanceof JsonObject) {
                value = ((JsonObject)value).toMap();
            }
            if (value instanceof Map) {
                result = Stream.concat(result, ((Map)value).entrySet().stream().flatMap(entity -> CouchbaseHook.processField(collectionEntity, fieldPath, rootField, (String)entity.getKey(), entity.getValue())));
            } else {
                throw new IllegalArgumentException(String.format("Incorrect value type '%s' for field type 'object': a Map was expected instead.", value.getClass()));
            }
        }
        return result;
    }

    public String getMessageSource() {
        return "couchbase";
    }

    private static CollectionsManifest.CollectionInfo collectionInfo(int vbucket, long collid) {
        return DCP.sessionState().get(vbucket).getCollectionsManifest().getCollection(collid);
    }

    protected static void setEntityInterceptors(Consumer<List<AtlasEntity>> createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
        CouchbaseHook.createInterceptor = createInterceptor;
        CouchbaseHook.updateInterceptor = updateInterceptor;
    }

    static void loop(boolean loop) {
        CouchbaseHook.loop = loop;
    }

    static {
        loop = true;
    }
}

