/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.spark.client;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.hive.common.log.LogRedirector;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.client.AbstractSparkClient;
import org.apache.hive.spark.client.RemoteDriver;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkSubmitSparkClient
extends AbstractSparkClient {
    private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
    private static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
    private static final String SPARK_HOME_ENV = "SPARK_HOME";
    private static final String SPARK_HOME_KEY = "spark.home";
    private static final long serialVersionUID = -4272763023516238171L;
    private List<String> argv;

    SparkSubmitSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, String sessionid) throws IOException {
        super(rpcServer, conf, hiveConf, sessionid);
    }

    @Override
    protected String getSparkHome() {
        String sparkHome = Strings.emptyToNull((String)((String)this.conf.get(SPARK_HOME_KEY)));
        if (sparkHome == null) {
            sparkHome = Strings.emptyToNull((String)System.getenv(SPARK_HOME_ENV));
        }
        if (sparkHome == null) {
            sparkHome = Strings.emptyToNull((String)System.getProperty(SPARK_HOME_KEY));
        }
        Preconditions.checkNotNull((Object)sparkHome, (Object)"Cannot use spark-submit without setting Spark Home");
        String master = (String)this.conf.get("spark.master");
        Preconditions.checkArgument((master != null ? 1 : 0) != 0, (Object)"spark.master is not defined.");
        this.argv = Lists.newLinkedList();
        this.argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
        return sparkHome;
    }

    @Override
    protected void addAppArg(String arg) {
        this.argv.add(arg);
    }

    @Override
    protected void addExecutableJar(String jar) {
        this.argv.add(jar);
    }

    @Override
    protected void addPropertiesFile(String absolutePath) {
        this.argv.add("--properties-file");
        this.argv.add(absolutePath);
    }

    @Override
    protected void addClass(String name) {
        this.argv.add("--class");
        this.argv.add(RemoteDriver.class.getName());
    }

    @Override
    protected void addJars(String jars) {
        this.argv.add("--jars");
        this.argv.add(jars);
    }

    @Override
    protected void addProxyUser(String proxyUser) {
        this.argv.add("--proxy-user");
        this.argv.add(proxyUser);
    }

    @Override
    protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) {
        if (isDoAsEnabled) {
            LinkedList kinitArgv = Lists.newLinkedList();
            kinitArgv.add("kinit");
            kinitArgv.add(principal);
            kinitArgv.add("-k");
            kinitArgv.add("-t");
            kinitArgv.add(keyTabFile + ";");
            kinitArgv.addAll(this.argv);
            this.argv = kinitArgv;
        } else {
            this.argv.add("--principal");
            this.argv.add(principal);
            this.argv.add("--keytab");
            this.argv.add(keyTabFile);
        }
    }

    @Override
    protected void addNumExecutors(String numOfExecutors) {
        this.argv.add("--num-executors");
        this.argv.add(numOfExecutors);
    }

    @Override
    protected void addExecutorMemory(String executorMemory) {
        this.argv.add("--executor-memory");
        this.argv.add(executorMemory);
    }

    @Override
    protected void addExecutorCores(String executorCores) {
        this.argv.add("--executor-cores");
        this.argv.add(executorCores);
    }

    private String getSparkJobCredentialProviderPassword() {
        if (this.conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
            return (String)this.conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
        }
        if (this.conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
            return (String)this.conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
        }
        return null;
    }

    @Override
    protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String clientId) throws IOException {
        String cmd = Joiner.on((String)" ").join(this.argv);
        LOG.info("Running client driver with argv: {}", (Object)cmd);
        ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
        pb.environment().remove("HIVE_HOME");
        pb.environment().remove("HIVE_CONF_DIR");
        String password = this.getSparkJobCredentialProviderPassword();
        if (password != null) {
            pb.environment().put("HADOOP_CREDSTORE_PASSWORD", password);
        }
        if (isTesting != null) {
            pb.environment().put("SPARK_TESTING", isTesting);
        }
        Process child = pb.start();
        String threadName = Thread.currentThread().getName();
        List childErrorLog = Collections.synchronizedList(new ArrayList());
        List childOutLog = Collections.synchronizedList(new ArrayList());
        LogRedirector.LogSourceCallback callback = () -> this.isAlive;
        LogRedirector.redirect((String)("spark-submit-stdout-redir-" + threadName), (LogRedirector)new LogRedirector(child.getInputStream(), LOG, childOutLog, callback));
        LogRedirector.redirect((String)("spark-submit-stderr-redir-" + threadName), (LogRedirector)new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
        Callable<Void> runnable = () -> {
            block16: {
                try {
                    int exitCode = child.waitFor();
                    if (exitCode == 0) {
                        Matcher m;
                        List list = childOutLog;
                        synchronized (list) {
                            for (String line : childOutLog) {
                                m = YARN_APPLICATION_ID_REGEX.matcher(line);
                                if (!m.find()) continue;
                                LOG.info("Found application id " + m.group(1));
                                rpcServer.setApplicationId(m.group(1));
                            }
                        }
                        list = childErrorLog;
                        synchronized (list) {
                            for (String line : childErrorLog) {
                                m = YARN_APPLICATION_ID_REGEX.matcher(line);
                                if (!m.find()) continue;
                                LOG.info("Found application id " + m.group(1));
                                rpcServer.setApplicationId(m.group(1));
                            }
                            break block16;
                        }
                    }
                    ArrayList<String> errorMessages = new ArrayList<String>();
                    List list = childErrorLog;
                    synchronized (list) {
                        for (String line : childErrorLog) {
                            if (!SparkClientUtilities.containsErrorKeyword(line)) continue;
                            errorMessages.add("\"" + line + "\"");
                        }
                    }
                    String errStr = errorMessages.isEmpty() ? "?" : Joiner.on((char)',').join(errorMessages);
                    rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed with exit code " + exitCode + " and error " + errStr));
                }
                catch (InterruptedException ie) {
                    LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
                    rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
                    Thread.interrupted();
                    child.destroy();
                }
                catch (Exception e) {
                    String errMsg = "Exception while waiting for child process (spark-submit)";
                    LOG.warn(errMsg, (Throwable)e);
                    rpcServer.cancelClient(clientId, errMsg);
                }
            }
            return null;
        };
        FutureTask<Void> futureTask = new FutureTask<Void>(runnable);
        Thread driverThread = new Thread(futureTask);
        driverThread.setDaemon(true);
        driverThread.setName("SparkSubmitMonitor");
        driverThread.start();
        return futureTask;
    }
}

