/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.atlas.ApplicationProperties;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaUtils
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
    public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
    public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
    public static final String JAAS_PASSWORD_SUFFIX = "password";
    private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
    private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
    private static final String JAAS_PRINCIPAL_PROP = "principal";
    private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
    private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
    private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import";
    private static final String JAAS_MASK_PASSWORD = "********";
    protected final Properties kafkaConfiguration;
    protected final AdminClient adminClient;
    protected final boolean importInternalTopics;

    public KafkaUtils(Configuration atlasConfiguration) {
        LOG.debug("==> KafkaUtils() ");
        this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties((Configuration)atlasConfiguration, (String)ATLAS_KAFKA_PROPERTY_PREFIX);
        KafkaUtils.setKafkaJAASProperties(atlasConfiguration, this.kafkaConfiguration);
        this.adminClient = AdminClient.create((Properties)this.kafkaConfiguration);
        this.importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
        LOG.debug("<== KafkaUtils() ");
    }

    public static void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
        LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
        if (kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
            LOG.debug("JAAS config is already set, returning");
            return;
        }
        Properties jaasConfig = ApplicationProperties.getSubsetAsProperties((Configuration)configuration, (String)JAAS_CONFIG_PREFIX_PARAM);
        if (jaasConfig != null && !jaasConfig.isEmpty()) {
            String keyPrefix;
            String keyParam;
            String loginModuleName;
            String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
            if (!KafkaUtils.isLoginKeytabBased() && KafkaUtils.isLoginTicketBased()) {
                LOG.debug("Checking if ticketBased-KafkaClient is set");
                String ticketBasedConfigPrefix = "atlas.jaas.ticketBased-KafkaClient";
                Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix);
                if (ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
                    LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
                    jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
                } else {
                    LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", (Object)JAAS_TICKET_BASED_CLIENT_NAME, (Object)jaasClientName);
                }
            }
            if ((loginModuleName = jaasConfig.getProperty(keyParam = (keyPrefix = jaasClientName + ".") + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM)) == null) {
                LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", new Object[]{jaasClientName, keyParam, jaasClientName});
                return;
            }
            keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
            String controlFlag = jaasConfig.getProperty(keyParam);
            if (StringUtils.isEmpty((String)controlFlag)) {
                String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
                controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
                LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", new Object[]{keyParam, controlFlag, validValues});
            }
            String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
            String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
            String passwordOptionKey = optionPrefix + JAAS_PASSWORD_SUFFIX;
            int optionPrefixLen = optionPrefix.length();
            StringBuilder optionStringBuffer = new StringBuilder();
            for (String key : jaasConfig.stringPropertyNames()) {
                String jaasKafkaClientConfigurationProperty;
                String optionVal;
                if (!key.startsWith(optionPrefix) || (optionVal = jaasConfig.getProperty(key)) == null) continue;
                optionVal = optionVal.trim();
                try {
                    if (key.equalsIgnoreCase(principalOptionKey)) {
                        optionVal = SecurityUtil.getServerPrincipal((String)optionVal, (String)null);
                    }
                }
                catch (IOException e) {
                    LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", (Object)optionVal);
                }
                if (key.equalsIgnoreCase(passwordOptionKey) && JAAS_MASK_PASSWORD.equals(configuration.getString(jaasKafkaClientConfigurationProperty = "atlas.jaas.KafkaClient.option.password"))) {
                    try {
                        optionVal = org.apache.atlas.security.SecurityUtil.getPassword((Configuration)configuration, (String)jaasKafkaClientConfigurationProperty, (String)"hadoop.security.credential.provider.path");
                    }
                    catch (Exception e) {
                        LOG.error("Error in getting secure password ", (Throwable)e);
                    }
                }
                optionVal = KafkaUtils.surroundWithQuotes(optionVal);
                optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
            }
            String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer);
            kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
        }
        LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
    }

    public static boolean isLoginTicketBased() {
        boolean ret = false;
        try {
            ret = UserGroupInformation.isLoginTicketBased();
        }
        catch (Exception excp) {
            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", (Throwable)excp);
        }
        return ret;
    }

    static boolean isLoginKeytabBased() {
        boolean ret = false;
        try {
            ret = UserGroupInformation.isLoginKeytabBased();
        }
        catch (Exception excp) {
            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", (Throwable)excp);
        }
        return ret;
    }

    static String surroundWithQuotes(String optionVal) {
        if (StringUtils.isEmpty((String)optionVal)) {
            return optionVal;
        }
        String doubleQuoteEscaped = optionVal.replace("\"", "\\\"");
        return String.format("\"%s\"", doubleQuoteEscaped);
    }

    public void createTopics(List<String> topicNames, int numPartitions, int replicationFactor) throws TopicExistsException, ExecutionException, InterruptedException {
        LOG.debug("==> createTopics() ");
        List newTopicList = topicNames.stream().map(topicName -> new NewTopic(topicName, numPartitions, (short)replicationFactor)).collect(Collectors.toList());
        CreateTopicsResult createTopicsResult = this.adminClient.createTopics(newTopicList);
        Map futureMap = createTopicsResult.values();
        for (Map.Entry futureEntry : futureMap.entrySet()) {
            KafkaFuture future = (KafkaFuture)futureEntry.getValue();
            future.get();
        }
        LOG.debug("<== createTopics() ");
    }

    public List<String> listAllTopics() throws ExecutionException, InterruptedException {
        LOG.debug("==> KafkaUtils.listAllTopics() ");
        ListTopicsResult listTopicsResult = this.adminClient.listTopics(new ListTopicsOptions().listInternal(this.importInternalTopics));
        ArrayList<String> ret = new ArrayList<String>((Collection)listTopicsResult.names().get());
        LOG.debug("<== KafkaUtils.listAllTopics() ");
        return ret;
    }

    public Integer getPartitionCount(String topicName) throws ExecutionException, InterruptedException {
        LOG.debug("==> KafkaUtils.getPartitionCount({})", (Object)topicName);
        Integer ret = null;
        List<TopicPartitionInfo> partitionList = this.getPartitionList(topicName);
        if (partitionList != null) {
            ret = partitionList.size();
        }
        LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", (Object)topicName, (Object)ret);
        return ret;
    }

    public Integer getReplicationFactor(String topicName) throws ExecutionException, InterruptedException {
        LOG.debug("==> KafkaUtils.getReplicationFactor({})", (Object)topicName);
        Integer ret = null;
        List<TopicPartitionInfo> partitionList = this.getPartitionList(topicName);
        if (partitionList != null) {
            ret = partitionList.stream().mapToInt(x -> x.replicas().size()).max().orElse(1);
        }
        LOG.debug("<== KafkaUtils.getReplicationFactor returning for topic {} with replicationFactor {}", (Object)topicName, ret);
        return ret;
    }

    @Override
    public void close() {
        LOG.debug("==> KafkaUtils.close()");
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        LOG.debug("<== KafkaUtils.close()");
    }

    private List<TopicPartitionInfo> getPartitionList(String topicName) throws ExecutionException, InterruptedException {
        List ret = null;
        DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(Collections.singleton(topicName));
        if (describeTopicsResult != null) {
            Map futureMap = describeTopicsResult.values();
            for (Map.Entry futureEntry : futureMap.entrySet()) {
                KafkaFuture topicDescriptionFuture = (KafkaFuture)futureEntry.getValue();
                TopicDescription topicDescription = (TopicDescription)topicDescriptionFuture.get();
                ret = topicDescription.partitions();
            }
        }
        return ret;
    }
}

