/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.repository.audit;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.inject.Singleton;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Singleton
@Component
@ConditionalOnAtlasProperty(property="atlas.EntityAuditRepository.impl")
public class CassandraBasedAuditRepository
extends AbstractStorageBasedAuditRepository {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraBasedAuditRepository.class);
    private static final String DEFAULT_KEYSPACE = "atlas_audit";
    private static final String DEFAULT_CLUSTER_NAME = "JanusGraph";
    private static final int DEFAULT_PORT = 9042;
    private static final int DEFAULT_REPLICATION_FACTOR = 3;
    public static final String MANAGE_EMBEDDED_CASSANDRA = "MANAGE_EMBEDDED_CASSANDRA";
    public static final String CASSANDRA_HOSTNAME_PROPERTY = "atlas.graph.storage.hostname";
    public static final String CASSANDRA_CLUSTERNAME_PROPERTY = "atlas.graph.storage.clustername";
    public static final String CASSANDRA_PORT_PROPERTY = "atlas.graph.storage.port";
    public static final String CASSANDRA_REPLICATION_FACTOR_PROPERTY = "atlas.EntityAuditRepository.replicationFactor";
    public static final String CASSANDRA_AUDIT_KEYSPACE_PROPERTY = "atlas.EntityAuditRepository.keyspace";
    public static final String CASSANDRA_USERNAME_PROPERTY = "atlas.graph.storage.username";
    public static final String CASSANDRA_PASSWORD_PROPERTY = "atlas.graph.storage.password";
    private static final String AUDIT_TABLE_SCHEMA = "CREATE TABLE audit(entityid text, created bigint, action text, user text, detail text, entity text, PRIMARY KEY (entityid, created)) WITH CLUSTERING ORDER BY (created DESC);";
    private static final String ENTITYID = "entityid";
    private static final String CREATED = "created";
    private static final String ACTION = "action";
    private static final String USER = "user";
    private static final String DETAIL = "detail";
    private static final String ENTITY = "entity";
    private static final String INSERT_STATEMENT_TEMPLATE = "INSERT INTO audit (entityid,created,action,user,detail,entity) VALUES (?,?,?,?,?,?)";
    private static final String SELECT_STATEMENT_TEMPLATE = "select * from audit where entityid=? order by created desc limit 10;";
    private static final String SELECT_DATE_STATEMENT_TEMPLATE = "select * from audit where entityid=? and created<=? order by created desc limit 10;";
    private String keyspace;
    private int replicationFactor;
    private Session cassSession;
    private String clusterName;
    private int port;
    private String username;
    private String password;
    private Map<String, List<String>> auditExcludedAttributesCache = new HashMap<String, List<String>>();
    private PreparedStatement insertStatement;
    private PreparedStatement selectStatement;
    private PreparedStatement selectDateStatement;

    @Override
    public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException {
        BoundStatement stmt = new BoundStatement(this.insertStatement);
        BatchStatement batch = new BatchStatement();
        events.forEach(event -> batch.add((Statement)stmt.bind(new Object[]{event.getEntityId(), event.getTimestamp(), event.getAction().toString(), event.getUser(), event.getDetails(), persistEntityDefinition ? event.getEntityDefinitionString() : null})));
        this.cassSession.execute((Statement)batch);
    }

    @Override
    public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException {
        BoundStatement stmt = new BoundStatement(this.insertStatement);
        BatchStatement batch = new BatchStatement();
        events.forEach(event -> batch.add((Statement)stmt.bind(new Object[]{event.getEntityId(), event.getTimestamp(), event.getAction().toString(), event.getUser(), event.getDetails(), persistEntityDefinition ? event.getEntityDefinitionString() : null})));
        this.cassSession.execute((Statement)batch);
    }

    private BoundStatement getSelectStatement(String entityId, String startKey) {
        BoundStatement stmt = StringUtils.isEmpty((String)startKey) ? new BoundStatement(this.selectStatement).bind(new Object[]{entityId}) : new BoundStatement(this.selectDateStatement).bind(new Object[]{entityId, Long.valueOf(startKey.split(":")[1])});
        return stmt;
    }

    @Override
    public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short maxResults) throws AtlasException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", new Object[]{entityId, startKey, maxResults});
        }
        ResultSet rs = this.cassSession.execute((Statement)this.getSelectStatement(entityId, startKey));
        ArrayList<EntityAuditEvent> entityResults = new ArrayList<EntityAuditEvent>();
        for (Row row : rs) {
            String rowEntityId = row.getString(ENTITYID);
            if (!entityId.equals(rowEntityId)) continue;
            EntityAuditEvent event = new EntityAuditEvent();
            event.setEntityId(rowEntityId);
            event.setAction(EntityAuditEvent.EntityAuditAction.fromString((String)row.getString(ACTION)));
            event.setDetails(row.getString(DETAIL));
            event.setUser(row.getString(USER));
            event.setTimestamp(row.getLong(CREATED));
            event.setEventKey(rowEntityId + ":" + event.getTimestamp());
            if (persistEntityDefinition) {
                event.setEntityDefinition(row.getString(ENTITY));
            }
            entityResults.add(event);
        }
        return entityResults;
    }

    @Override
    public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResults) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", new Object[]{entityId, startKey, maxResults});
        }
        ResultSet rs = this.cassSession.execute((Statement)this.getSelectStatement(entityId, startKey));
        ArrayList<EntityAuditEventV2> entityResults = new ArrayList<EntityAuditEventV2>();
        for (Row row : rs) {
            String rowEntityId = row.getString(ENTITYID);
            if (!entityId.equals(rowEntityId)) continue;
            EntityAuditEventV2 event = new EntityAuditEventV2();
            event.setEntityId(rowEntityId);
            event.setAction(EntityAuditEventV2.EntityAuditActionV2.fromString((String)row.getString(ACTION)));
            event.setDetails(row.getString(DETAIL));
            event.setUser(row.getString(USER));
            event.setTimestamp(row.getLong(CREATED));
            event.setEventKey(rowEntityId + ":" + event.getTimestamp());
            if (persistEntityDefinition) {
                event.setEntityDefinition(row.getString(ENTITY));
            }
            entityResults.add(event);
        }
        return entityResults;
    }

    @Override
    public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) {
        return this.listEventsV2(entityId, auditAction, null, limit);
    }

    @Override
    public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
        throw new NotImplementedException();
    }

    public void start() throws AtlasException {
        this.initApplicationProperties();
        this.initializeSettings();
        this.startInternal();
    }

    void initializeSettings() {
        this.keyspace = APPLICATION_PROPERTIES.getString(CASSANDRA_AUDIT_KEYSPACE_PROPERTY, DEFAULT_KEYSPACE);
        this.replicationFactor = APPLICATION_PROPERTIES.getInt(CASSANDRA_REPLICATION_FACTOR_PROPERTY, 3);
        this.clusterName = APPLICATION_PROPERTIES.getString(CASSANDRA_CLUSTERNAME_PROPERTY, DEFAULT_CLUSTER_NAME);
        this.port = APPLICATION_PROPERTIES.getInt(CASSANDRA_PORT_PROPERTY, 9042);
        this.username = APPLICATION_PROPERTIES.getString(CASSANDRA_USERNAME_PROPERTY, "");
        this.password = APPLICATION_PROPERTIES.getString(CASSANDRA_PASSWORD_PROPERTY, "");
    }

    @VisibleForTesting
    void startInternal() throws AtlasException {
        this.createSession();
    }

    void createSession() throws AtlasException {
        Cluster.Builder cassandraClusterBuilder = Cluster.builder();
        String hostname = APPLICATION_PROPERTIES.getString(CASSANDRA_HOSTNAME_PROPERTY, "localhost");
        Cluster cluster = cassandraClusterBuilder.addContactPoint(hostname).withClusterName(this.clusterName).withPort(this.port).withCredentials(this.username.trim(), this.password.trim()).build();
        try {
            this.cassSession = cluster.connect();
            if (cluster.getMetadata().getKeyspace(this.keyspace) == null) {
                String query = "CREATE KEYSPACE " + this.keyspace + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':" + this.replicationFactor + "}; ";
                this.cassSession.execute(query);
                this.cassSession.close();
                this.cassSession = cluster.connect(this.keyspace);
                this.cassSession.execute(AUDIT_TABLE_SCHEMA);
            } else {
                this.cassSession.close();
                this.cassSession = cluster.connect(this.keyspace);
            }
            this.insertStatement = this.cassSession.prepare(INSERT_STATEMENT_TEMPLATE.replace("KEYSPACE", this.keyspace));
            this.selectStatement = this.cassSession.prepare(SELECT_STATEMENT_TEMPLATE.replace("KEYSPACE", this.keyspace));
            this.selectDateStatement = this.cassSession.prepare(SELECT_DATE_STATEMENT_TEMPLATE.replace("KEYSPACE", this.keyspace));
        }
        catch (Exception e) {
            throw new AtlasException((Throwable)e);
        }
    }

    public void stop() throws AtlasException {
        this.cassSession.close();
    }
}

