/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.kafka.KafkaOutputFormat;
import org.apache.hadoop.hive.kafka.KafkaTableProperties;
import org.apache.hadoop.hive.kafka.KafkaWritable;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.ReflectionUtil;
import org.apache.kafkaesqueesqueesque.clients.producer.ProducerRecord;
import org.apache.kafkaesqueesqueesque.common.config.ConfigDef;
import org.apache.kafkaesqueesqueesque.common.errors.AuthenticationException;
import org.apache.kafkaesqueesqueesque.common.errors.AuthorizationException;
import org.apache.kafkaesqueesqueesque.common.errors.InvalidTopicException;
import org.apache.kafkaesqueesqueesque.common.errors.OffsetMetadataTooLarge;
import org.apache.kafkaesqueesqueesque.common.errors.SecurityDisabledException;
import org.apache.kafkaesqueesqueesque.common.errors.SerializationException;
import org.apache.kafkaesqueesqueesque.common.errors.UnknownServerException;
import org.apache.kafkaesqueesqueesque.common.security.auth.SecurityProtocol;
import org.apache.kafkaesqueesqueesque.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class KafkaUtils {
    private static final Logger log = LoggerFactory.getLogger(KafkaUtils.class);
    private static final String JAAS_TEMPLATE = "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";";
    private static final String JAAS_TEMPLATE_SCRAM = "org.apache.kafkaesqueesqueesque.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\" serviceName=\"%s\" tokenauth=true;";
    static final Text KAFKA_DELEGATION_TOKEN_KEY = new Text("KAFKA_DELEGATION_TOKEN");
    private static final Set<String> SSL_CONFIG_KEYS = ImmutableSet.copyOf(new ConfigDef().withClientSslSupport().configKeys().keySet());
    static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
    static final String PRODUCER_CONFIGURATION_PREFIX = "kafka.producer";
    static final Set<String> FORBIDDEN_PROPERTIES = new HashSet<String>((Collection<String>)ImmutableList.of((Object)"enable.auto.commit", (Object)"auto.offset.reset", (Object)"key.deserializer", (Object)"value.deserializer", (Object)"transactional.id", (Object)"key.serializer", (Object)"value.serializer"));

    private KafkaUtils() {
    }

    static Properties consumerProperties(Configuration configuration) {
        Properties props = new Properties();
        props.setProperty("client.id", Utilities.getTaskId((Configuration)configuration));
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("auto.offset.reset", "none");
        String brokerEndPoint = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
            throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config " + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        }
        props.setProperty("bootstrap.servers", brokerEndPoint);
        props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        if (UserGroupInformation.isSecurityEnabled()) {
            KafkaUtils.addKerberosJaasConf(configuration, props);
        }
        props.putAll(KafkaUtils.extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
        KafkaUtils.setupKafkaSslProperties(configuration, props);
        return props;
    }

    static void setupKafkaSslProperties(Configuration configuration, Properties props) {
        KafkaUtils.copySSLProperties(configuration, props);
        String credKeystore = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName());
        if (credKeystore != null && !credKeystore.isEmpty()) {
            String truststorePasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName());
            String keystorePasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName());
            String keyPasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName());
            String resourcesDir = HiveConf.getVar((Configuration)configuration, (HiveConf.ConfVars)HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR);
            try {
                String truststoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName());
                Path truststorePath = new Path(truststoreLoc);
                props.setProperty("ssl.truststore.location", new File(resourcesDir + "/" + truststorePath.getName()).getAbsolutePath());
                KafkaUtils.writeStoreToLocal(configuration, truststoreLoc, new File(resourcesDir).getAbsolutePath());
                String truststorePasswd = Utilities.getPasswdFromKeystore((String)credKeystore, (String)truststorePasswdConfig);
                props.setProperty("ssl.truststore.password", truststorePasswd);
                if (!keystorePasswdConfig.isEmpty()) {
                    log.info("Kafka keystore configured, configuring local keystore");
                    String keystoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_KEYSTORE_LOCATION_CONFIG.getName());
                    Path keystorePath = new Path(keystoreLoc);
                    props.setProperty("ssl.keystore.location", new File(resourcesDir + "/" + keystorePath.getName()).getAbsolutePath());
                    KafkaUtils.writeStoreToLocal(configuration, keystoreLoc, new File(resourcesDir).getAbsolutePath());
                    String keystorePasswd = Utilities.getPasswdFromKeystore((String)credKeystore, (String)keystorePasswdConfig);
                    props.setProperty("ssl.keystore.password", keystorePasswd);
                }
                if (!keyPasswdConfig.isEmpty()) {
                    String keyPasswd = Utilities.getPasswdFromKeystore((String)credKeystore, (String)keyPasswdConfig);
                    props.setProperty("ssl.key.password", keyPasswd);
                }
            }
            catch (IOException | URISyntaxException e) {
                throw new IllegalStateException("Unable to retrieve password from the credential keystore", e);
            }
        }
    }

    private static void copySSLProperties(Configuration source, Properties target) {
        for (String p : SSL_CONFIG_KEYS) {
            String v = source.get(p);
            if (v == null || target.containsKey(p)) continue;
            target.setProperty(p, v);
        }
    }

    private static void writeStoreToLocal(Configuration configuration, String hdfsLoc, String localDest) throws IOException, URISyntaxException {
        try {
            File localDir = new File(localDest);
            if (!localDir.exists() && !localDir.mkdirs()) {
                throw new IOException("Unable to create local directory, " + localDest);
            }
            URI uri = new URI(hdfsLoc);
            FileSystem fs = FileSystem.get((URI)new URI(hdfsLoc), (Configuration)configuration);
            fs.copyToLocalFile(new Path(uri.toString()), new Path(localDest));
        }
        catch (URISyntaxException e) {
            throw new IOException("Unable to download store", e);
        }
    }

    private static Map<String, String> extractExtraProperties(Configuration configuration, String prefix) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Map kafkaProperties = configuration.getValByRegex("^" + prefix + "\\..*");
        for (Map.Entry entry : kafkaProperties.entrySet()) {
            String key = ((String)entry.getKey()).substring(prefix.length() + 1);
            if (FORBIDDEN_PROPERTIES.contains(key)) {
                throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
            }
            builder.put((Object)key, entry.getValue());
        }
        return builder.build();
    }

    static Properties producerProperties(Configuration configuration) {
        String writeSemanticValue = configuration.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName());
        KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.valueOf(writeSemanticValue);
        Properties properties = new Properties();
        String brokerEndPoint = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
            throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config " + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        }
        properties.setProperty("bootstrap.servers", brokerEndPoint);
        if (UserGroupInformation.isSecurityEnabled()) {
            KafkaUtils.addKerberosJaasConf(configuration, properties);
        }
        properties.putAll(KafkaUtils.extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
        KafkaUtils.setupKafkaSslProperties(configuration, properties);
        String taskId = configuration.get("mapred.task.id", null);
        properties.setProperty("client.id", taskId == null ? "random_" + UUID.randomUUID().toString() : taskId);
        switch (writeSemantic) {
            case AT_LEAST_ONCE: {
                properties.setProperty("retries", String.valueOf(Integer.MAX_VALUE));
                properties.setProperty("acks", "all");
                break;
            }
            case EXACTLY_ONCE: {
                String reducerId = KafkaUtils.getTaskId(configuration);
                properties.setProperty("acks", "all");
                properties.setProperty("retries", String.valueOf(Integer.MAX_VALUE));
                properties.setProperty("transactional.id", reducerId);
                properties.setProperty("enable.idempotence", "true");
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown Semantic " + (Object)((Object)writeSemantic));
            }
        }
        return properties;
    }

    static void copyDependencyJars(Configuration conf, Class<?> ... classes) throws IOException {
        HashSet jars = new HashSet();
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        jars.addAll(conf.getStringCollection("tmpjars"));
        jars.addAll(Arrays.stream(classes).filter(Objects::nonNull).map(arg_0 -> KafkaUtils.lambda$copyDependencyJars$0((FileSystem)localFs, arg_0)).collect(Collectors.toList()));
        if (jars.isEmpty()) {
            return;
        }
        conf.set("tmpjars", StringUtils.arrayToString((String[])jars.toArray(new String[0])));
    }

    static AbstractSerDe createDelegate(String className) {
        Class<?> clazz;
        try {
            clazz = Class.forName(className);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        return (AbstractSerDe)ReflectionUtil.newInstance(clazz, null);
    }

    static ProducerRecord<byte[], byte[]> toProducerRecord(String topic, KafkaWritable value) {
        return new ProducerRecord<byte[], byte[]>(topic, value.getPartition() != -1 ? Integer.valueOf(value.getPartition()) : null, value.getTimestamp() != -1L ? Long.valueOf(value.getTimestamp()) : null, value.getRecordKey(), value.getValue());
    }

    static boolean exceptionIsFatal(Throwable exception) {
        boolean securityException = exception instanceof AuthenticationException || exception instanceof AuthorizationException || exception instanceof SecurityDisabledException;
        boolean communicationException = exception instanceof InvalidTopicException || exception instanceof UnknownServerException || exception instanceof SerializationException || exception instanceof OffsetMetadataTooLarge || exception instanceof IllegalStateException;
        return securityException || communicationException;
    }

    static String getTaskId(Configuration hiveConf) {
        String id = (String)Preconditions.checkNotNull((Object)hiveConf.get("mapred.task.id", null));
        int index = id.lastIndexOf("_");
        if (index != -1) {
            return id.substring(0, index);
        }
        return id;
    }

    static void addKerberosJaasConf(Configuration configuration, Properties props) {
        Credentials creds;
        Token token;
        String principal;
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "GSSAPI");
        props.setProperty("sasl.kerberos.service.name", "kafka");
        String principalHost = HiveConf.getVar((Configuration)configuration, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
        String keyTab = HiveConf.getVar((Configuration)configuration, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
        if (principalHost == null || principalHost.isEmpty() || keyTab == null || keyTab.isEmpty()) {
            keyTab = HiveConf.getVar((Configuration)configuration, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_FS_KERBEROS_KEYTAB_FILE);
            principalHost = HiveConf.getVar((Configuration)configuration, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_FS_KERBEROS_PRINCIPAL);
        }
        try {
            principal = SecurityUtil.getServerPrincipal((String)principalHost, (String)"0.0.0.0");
        }
        catch (IOException e) {
            log.error("Can not construct kerberos principal", (Throwable)e);
            throw new RuntimeException(e);
        }
        String jaasConf = String.format(JAAS_TEMPLATE, keyTab, principal);
        props.setProperty("sasl.jaas.config", jaasConf);
        if (configuration instanceof JobConf && (token = (creds = ((JobConf)configuration).getCredentials()).getToken(KAFKA_DELEGATION_TOKEN_KEY)) != null) {
            log.info("Kafka delegation token has been found: {}", (Object)token);
            props.setProperty("sasl.mechanism", "SCRAM-SHA-256");
            jaasConf = String.format(JAAS_TEMPLATE_SCRAM, new String(token.getIdentifier()), Base64.getEncoder().encodeToString(token.getPassword()), token.getService());
            props.setProperty("sasl.jaas.config", jaasConf);
        }
        log.info("Kafka client running with following JAAS = [{}]", (Object)jaasConf);
    }

    static SecurityProtocol securityProtocol(Properties props) {
        String[] securityProtocolConfigs;
        for (String c : securityProtocolConfigs = new String[]{"security.protocol", "kafka.consumer.security.protocol", "kafka.producer.security.protocol"}) {
            String v = props.getProperty(c);
            if (v == null || v.isEmpty()) continue;
            return SecurityProtocol.forName(v);
        }
        return null;
    }

    private static /* synthetic */ String lambda$copyDependencyJars$0(FileSystem localFs, Class clazz) {
        String path = Utilities.jarFinderGetJar((Class)clazz);
        if (path == null) {
            throw new RuntimeException("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
        }
        try {
            if (!localFs.exists(new Path(path))) {
                throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return path;
    }
}

