/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ranger.services.kafka.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.ranger.plugin.client.BaseClient;
import org.apache.ranger.plugin.service.ResourceLookupContext;
import org.apache.ranger.plugin.util.TimedEventUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceKafkaClient {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceKafkaClient.class);
    private static final String errMessage = " You can still save the repository and start creating policies, but you would not be able to use autocomplete for resource names. Check server logs for more info.";
    private static final String TOPIC_KEY = "topic";
    private static final String KEY_SASL_MECHANISM = "sasl.mechanism";
    private static final String KEY_SASL_JAAS_CONFIG = "sasl.jaas.config";
    private static final String KEY_KAFKA_KEYTAB = "kafka.keytab";
    private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
    private static final String JAAS_KRB5_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
    private static final String JAAS_USE_KEYTAB = "useKeyTab=true";
    private static final String JAAS_KEYTAB = "keyTab=\"";
    private static final String JAAS_STOKE_KEY = "storeKey=true";
    private static final String JAAS_SERVICE_NAME = "serviceName=kafka";
    private static final String JAAS_USER_TICKET_CACHE = "useTicketCache=false";
    private static final String JAAS_PRINCIPAL = "principal=\"";
    private static final long LOOKUP_TIMEOUT_SEC = 5L;
    String serviceName;
    Map<String, String> configs;

    public ServiceKafkaClient(String serviceName, Map<String, String> configs) {
        this.serviceName = serviceName;
        this.configs = configs;
    }

    public Map<String, Object> connectionTest() {
        String errMsg = errMessage;
        HashMap<String, Object> responseData = new HashMap<String, Object>();
        try {
            this.getTopicList(null);
            String successMsg = "ConnectionTest Successful";
            BaseClient.generateResponseDataMap((boolean)true, (String)successMsg, (String)successMsg, null, null, responseData);
        }
        catch (Exception e) {
            LOG.error("Error connecting to Kafka. kafkaClient = {}", (Object)this, (Object)e);
            String failureMsg = "Unable to connect to Kafka instance." + e.getMessage();
            BaseClient.generateResponseDataMap((boolean)false, (String)failureMsg, (String)(failureMsg + errMsg), null, null, responseData);
        }
        return responseData;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<String> getResources(ResourceLookupContext context) {
        List resultList;
        block10: {
            String userInput = context.getUserInput();
            String resource = context.getResourceName();
            Map resourceMap = context.getResources();
            resultList = null;
            List topicList = null;
            ResourceType lookupResource = ResourceType.TOPIC;
            LOG.debug("<== getResources()  UserInput: \"{}\" resource : {} resourceMap: {}", new Object[]{userInput, resource, resourceMap});
            if (userInput != null && resource != null) {
                if (resourceMap != null && !resourceMap.isEmpty()) {
                    topicList = (List)resourceMap.get(TOPIC_KEY);
                }
                if (resource.trim().equalsIgnoreCase(TOPIC_KEY)) {
                    lookupResource = ResourceType.TOPIC;
                }
            }
            if (userInput != null) {
                try {
                    Callable<List> callableObj = null;
                    String userInputFinal = userInput;
                    List finalTopicList = topicList;
                    if (lookupResource == ResourceType.TOPIC) {
                        callableObj = () -> {
                            ArrayList<String> retList = new ArrayList<String>();
                            try {
                                List<String> list = this.getTopicList(finalTopicList);
                                if (userInputFinal != null && !userInputFinal.isEmpty()) {
                                    for (String value : list) {
                                        if (!value.startsWith(userInputFinal)) continue;
                                        retList.add(value);
                                    }
                                } else {
                                    retList.addAll(list);
                                }
                            }
                            catch (Exception ex) {
                                LOG.error("Error getting topic.", (Throwable)ex);
                            }
                            return retList;
                        };
                    }
                    if (callableObj == null) break block10;
                    ServiceKafkaClient serviceKafkaClient = this;
                    synchronized (serviceKafkaClient) {
                        resultList = (List)TimedEventUtil.timedTask(callableObj, (long)5L, (TimeUnit)TimeUnit.SECONDS);
                    }
                }
                catch (Exception e) {
                    LOG.error("Unable to get hive resources.", (Throwable)e);
                }
            }
        }
        return resultList;
    }

    public String toString() {
        return "ServiceKafkaClient [serviceName = " + this.serviceName + ", configs = " + this.configs + "]";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> getTopicList(List<String> ignoreTopicList) throws Exception {
        ArrayList<String> ret = new ArrayList<String>();
        int sessionTimeout = 5000;
        int connectionTimeout = 10000;
        try (AdminClient adminClient = null;){
            Properties props = new Properties();
            props.put("bootstrap.servers", this.configs.get("bootstrap.servers"));
            props.put("security.protocol", this.configs.get("security.protocol"));
            props.put(KEY_SASL_MECHANISM, this.configs.get(KEY_SASL_MECHANISM));
            props.put(KEY_SASL_JAAS_CONFIG, this.getJAASConfig(this.configs));
            props.put("request.timeout.ms", this.getIntProperty("request.timeout.ms", sessionTimeout));
            props.put("connections.max.idle.ms", this.getIntProperty("connections.max.idle.ms", connectionTimeout));
            adminClient = KafkaAdminClient.create((Properties)props);
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            if (listTopicsResult != null) {
                Collection topicListings = (Collection)listTopicsResult.listings().get();
                for (TopicListing topicListing : topicListings) {
                    String topicName = topicListing.name();
                    if (ignoreTopicList != null && ignoreTopicList.contains(topicName)) continue;
                    ret.add(topicName);
                }
            }
        }
        return ret;
    }

    private Integer getIntProperty(String key, int defaultValue) {
        if (key == null) {
            return defaultValue;
        }
        String returnVal = this.configs.get(key);
        if (returnVal == null) {
            return defaultValue;
        }
        return Integer.valueOf(returnVal);
    }

    private String getJAASConfig(Map<String, String> configs) {
        String jaasConfig = JAAS_KRB5_MODULE + " " + JAAS_USE_KEYTAB + " " + JAAS_KEYTAB + configs.get(KEY_KAFKA_KEYTAB) + "\"" + " " + JAAS_STOKE_KEY + " " + JAAS_USER_TICKET_CACHE + " " + JAAS_SERVICE_NAME + " " + JAAS_PRINCIPAL + configs.get(KEY_KAFKA_PRINCIPAL) + "\";";
        LOG.debug("KafkaClient JAAS: {}", (Object)jaasConfig);
        return jaasConfig;
    }

    static enum ResourceType {
        TOPIC;

    }
}

