/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.spark.client;

import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.io.Resources;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hive.spark.client.BaseProtocol;
import org.apache.hive.spark.client.Job;
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.JobHandleImpl;
import org.apache.hive.spark.client.RemoteDriver;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.apache.spark.SparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractSparkClient
implements SparkClient {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkClient.class);
    private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000L;
    private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
    private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions";
    private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions";
    private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
    private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
    private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
    protected final Map<String, String> conf;
    private final HiveConf hiveConf;
    private final java.util.concurrent.Future<Void> driverFuture;
    private final Map<String, JobHandleImpl<?>> jobs;
    private final Rpc driverRpc;
    private final ClientProtocol protocol;
    protected volatile boolean isAlive;
    private File sparkTmpProperties;

    protected AbstractSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, String sessionid) throws IOException {
        this.conf = conf;
        this.hiveConf = hiveConf;
        this.jobs = Maps.newConcurrentMap();
        String secret = rpcServer.createSecret();
        this.driverFuture = this.startDriver(rpcServer, sessionid, secret);
        this.protocol = new ClientProtocol();
        try {
            this.driverRpc = (Rpc)rpcServer.registerClient(sessionid, secret, this.protocol).get();
        }
        catch (Throwable e) {
            String errorMsg = e.getCause() instanceof TimeoutException ? "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons include network issues, errors in remote driver, cluster has no available resources, etc.\nPlease check YARN or Spark driver's logs for further information." : (e.getCause() instanceof InterruptedException ? "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible that the query was cancelled which would cause the Spark Session to close." : "Error while waiting for Remote Spark Driver to connect back to HiveServer2.");
            if (this.driverFuture.isDone()) {
                try {
                    this.driverFuture.get();
                }
                catch (InterruptedException ie) {
                    LOG.warn("Interrupted before driver thread was finished.", (Throwable)ie);
                }
                catch (ExecutionException ee) {
                    LOG.error("Driver thread failed", (Throwable)ee);
                }
            } else {
                this.driverFuture.cancel(true);
            }
            throw new RuntimeException(errorMsg, e);
        }
        LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
        this.driverRpc.addListener(new Rpc.Listener(){

            @Override
            public void rpcClosed(Rpc rpc) {
                if (AbstractSparkClient.this.isAlive) {
                    LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", (Object)AbstractSparkClient.this.driverRpc.getRemoteAddress());
                    AbstractSparkClient.this.isAlive = false;
                }
            }

            public String toString() {
                return "Connection to Remote Spark Driver Closed Unexpectedly";
            }
        });
        this.isAlive = true;
    }

    @Override
    public <T extends Serializable> JobHandle<T> submit(Job<T> job) {
        return this.protocol.submit(job, Collections.emptyList());
    }

    @Override
    public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
        return this.protocol.submit(job, listeners);
    }

    @Override
    public <T extends Serializable> java.util.concurrent.Future<T> run(Job<T> job) {
        return this.protocol.run(job);
    }

    @Override
    public void stop() {
        if (this.isAlive) {
            this.isAlive = false;
            try {
                this.protocol.endSession();
            }
            catch (Exception e) {
                LOG.warn("Exception while waiting for end session reply.", (Throwable)e);
            }
            finally {
                this.driverRpc.close();
            }
        }
        if (this.sparkTmpProperties != null && this.sparkTmpProperties.exists()) {
            this.sparkTmpProperties.delete();
        }
        try {
            this.driverFuture.get(10000L, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            LOG.error("Exception while waiting for driver future to complete", (Throwable)e);
        }
        catch (TimeoutException e) {
            LOG.warn("Timed out shutting down remote driver, cancelling...");
            this.driverFuture.cancel(true);
        }
        catch (InterruptedException ie) {
            LOG.debug("Interrupted before driver thread was finished.");
            this.driverFuture.cancel(true);
        }
    }

    @Override
    public java.util.concurrent.Future<?> addJar(URI uri) {
        return this.run(new AddJarJob(uri.toString()));
    }

    @Override
    public java.util.concurrent.Future<?> addFile(URI uri) {
        return this.run(new AddFileJob(uri.toString()));
    }

    @Override
    public java.util.concurrent.Future<Integer> getExecutorCount() {
        return this.run(new GetExecutorCountJob());
    }

    @Override
    public java.util.concurrent.Future<Integer> getDefaultParallelism() {
        return this.run(new GetDefaultParallelismJob());
    }

    @Override
    public boolean isActive() {
        return this.isAlive && this.driverRpc.isActive();
    }

    @Override
    public void cancel(String jobId) {
        this.protocol.cancel(jobId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private java.util.concurrent.Future<Void> startDriver(RpcServer rpcServer, String clientId, String secret) throws IOException {
        String regStr;
        String hiveHadoopTestClasspath;
        String serverAddress = rpcServer.getAddress();
        String serverPort = String.valueOf(rpcServer.getPort());
        String sparkHome = this.getSparkHome();
        String sparkLogDir = this.conf.get("hive.spark.log.dir");
        if (sparkLogDir == null) {
            sparkLogDir = sparkHome == null ? "./target/" : sparkHome + "/logs/";
        }
        String osxTestOpts = "";
        if (Strings.nullToEmpty((String)System.getProperty("os.name")).toLowerCase().contains("mac")) {
            osxTestOpts = Strings.nullToEmpty((String)System.getenv(OSX_TEST_OPTS));
        }
        String driverJavaOpts = Joiner.on((String)" ").skipNulls().join((Object)("-Dhive.spark.log.dir=" + sparkLogDir), (Object)osxTestOpts, new Object[]{this.conf.get(DRIVER_OPTS_KEY)});
        String executorJavaOpts = Joiner.on((String)" ").skipNulls().join((Object)("-Dhive.spark.log.dir=" + sparkLogDir), (Object)osxTestOpts, new Object[]{this.conf.get(EXECUTOR_OPTS_KEY)});
        File properties = File.createTempFile("spark-submit.", ".properties");
        if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
            throw new IOException("Cannot change permissions of job properties file.");
        }
        this.sparkTmpProperties = properties;
        Properties allProps = new Properties();
        try {
            URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
            if (sparkDefaultsUrl != null) {
                LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
                allProps.load(new ByteArrayInputStream(Resources.toByteArray((URL)sparkDefaultsUrl)));
            }
        }
        catch (Exception e) {
            String msg = "Exception trying to load spark-defaults.conf: " + e;
            throw new IOException(msg, e);
        }
        for (Map.Entry<String, String> e : this.conf.entrySet()) {
            allProps.put(e.getKey(), this.conf.get(e.getKey()));
        }
        allProps.put("spark.client.authentication.client_id", clientId);
        allProps.put("spark.client.authentication.secret", secret);
        allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
        allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
        String isTesting = this.conf.get("spark.testing");
        if (isTesting != null && isTesting.equalsIgnoreCase("true") && !(hiveHadoopTestClasspath = Strings.nullToEmpty((String)System.getenv("HIVE_HADOOP_TEST_CLASSPATH"))).isEmpty()) {
            String extraDriverClasspath = Strings.nullToEmpty((String)((String)allProps.get(DRIVER_EXTRA_CLASSPATH)));
            if (extraDriverClasspath.isEmpty()) {
                allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
            } else {
                extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
                allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
            }
            String extraExecutorClasspath = Strings.nullToEmpty((String)((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)));
            if (extraExecutorClasspath.isEmpty()) {
                allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
            } else {
                extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
                allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
            }
        }
        try (OutputStreamWriter writer = new OutputStreamWriter((OutputStream)new FileOutputStream(properties), Charsets.UTF_8);){
            allProps.store(writer, "Spark Context configuration");
        }
        String master = this.conf.get("spark.master");
        Preconditions.checkArgument((master != null ? 1 : 0) != 0, (Object)"spark.master is not defined.");
        String deployMode = this.conf.get(SPARK_DEPLOY_MODE);
        if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
            String numOfExecutors;
            String executorMemory;
            String executorCores = this.conf.get("spark.executor.cores");
            if (executorCores != null) {
                this.addExecutorCores(executorCores);
            }
            if ((executorMemory = this.conf.get("spark.executor.memory")) != null) {
                this.addExecutorMemory(executorMemory);
            }
            if ((numOfExecutors = this.conf.get("spark.executor.instances")) != null) {
                this.addNumExecutors(numOfExecutors);
            }
        }
        if ("kerberos".equals(this.hiveConf.get("hadoop.security.authentication"))) {
            String principal = SecurityUtil.getServerPrincipal((String)this.hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), (String)"0.0.0.0");
            String keyTabFile = this.hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
            boolean isDoAsEnabled = this.hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
            if (StringUtils.isNotBlank((CharSequence)principal) && StringUtils.isNotBlank((CharSequence)keyTabFile)) {
                this.addKeytabAndPrincipal(isDoAsEnabled, keyTabFile, principal);
            }
        }
        if (this.hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
            try {
                String currentUser = Utils.getUGI().getShortUserName();
                if (!currentUser.equals(System.getProperty("user.name"))) {
                    LOG.info("Attempting impersonation of " + currentUser);
                    this.addProxyUser(currentUser);
                }
            }
            catch (Exception e) {
                String msg = "Cannot obtain username: " + e;
                throw new IllegalStateException(msg, e);
            }
        }
        if ("org.apache.hive.spark.HiveKryoRegistrator".equals(regStr = this.conf.get("spark.kryo.registrator"))) {
            this.addJars(SparkClientUtilities.findKryoRegistratorJar(this.hiveConf));
        }
        this.addPropertiesFile(properties.getAbsolutePath());
        this.addClass(RemoteDriver.class.getName());
        String jar = "spark-internal";
        if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
            jar = (String)SparkContext.jarOfClass(this.getClass()).get();
        }
        this.addExecutableJar(jar);
        this.addAppArg("--remote-host");
        this.addAppArg(serverAddress);
        this.addAppArg("--remote-port");
        this.addAppArg(serverPort);
        for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
            String value = RpcConfiguration.getValue(this.hiveConf, hiveSparkConfKey);
            this.addAppArg("--remote-driver-conf");
            this.addAppArg(String.format("%s=%s", hiveSparkConfKey, value));
        }
        return this.launchDriver(isTesting, rpcServer, clientId);
    }

    protected abstract java.util.concurrent.Future<Void> launchDriver(String var1, RpcServer var2, String var3) throws IOException;

    protected abstract String getSparkHome();

    protected abstract void addAppArg(String var1);

    protected abstract void addExecutableJar(String var1);

    protected abstract void addPropertiesFile(String var1);

    protected abstract void addClass(String var1);

    protected abstract void addJars(String var1);

    protected abstract void addProxyUser(String var1);

    protected abstract void addKeytabAndPrincipal(boolean var1, String var2, String var3);

    protected abstract void addNumExecutors(String var1);

    protected abstract void addExecutorMemory(String var1);

    protected abstract void addExecutorCores(String var1);

    private static class GetDefaultParallelismJob
    implements Job<Integer> {
        private static final long serialVersionUID = 1L;

        private GetDefaultParallelismJob() {
        }

        @Override
        public Integer call(JobContext jc) throws Exception {
            return jc.sc().sc().defaultParallelism();
        }
    }

    private static class GetExecutorCountJob
    implements Job<Integer> {
        private static final long serialVersionUID = 1L;

        private GetExecutorCountJob() {
        }

        @Override
        public Integer call(JobContext jc) throws Exception {
            int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1;
            return count;
        }
    }

    private static class AddFileJob
    implements Job<Serializable> {
        private static final long serialVersionUID = 1L;
        private final String path;

        AddFileJob() {
            this(null);
        }

        AddFileJob(String path) {
            this.path = path;
        }

        @Override
        public Serializable call(JobContext jc) throws Exception {
            jc.sc().addFile(this.path);
            return null;
        }
    }

    private static class AddJarJob
    implements Job<Serializable> {
        private static final long serialVersionUID = 1L;
        private final String path;

        AddJarJob() {
            this(null);
        }

        AddJarJob(String path) {
            this.path = path;
        }

        @Override
        public Serializable call(JobContext jc) throws Exception {
            jc.sc().addJar(this.path);
            jc.getAddedJars().put(this.path, System.currentTimeMillis());
            return null;
        }
    }

    private class ClientProtocol
    extends BaseProtocol {
        private ClientProtocol() {
        }

        <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) {
            final String jobId = UUID.randomUUID().toString();
            final Promise promise = AbstractSparkClient.this.driverRpc.createPromise();
            final JobHandleImpl handle = new JobHandleImpl(AbstractSparkClient.this, promise, jobId, listeners);
            AbstractSparkClient.this.jobs.put(jobId, handle);
            final Future<Void> rpc = AbstractSparkClient.this.driverRpc.call(new BaseProtocol.JobRequest<T>(jobId, job));
            LOG.debug("Send JobRequest[{}].", (Object)jobId);
            rpc.addListener((GenericFutureListener)new GenericFutureListener<Future<Void>>(){

                public void operationComplete(Future<Void> f) {
                    if (f.isSuccess()) {
                        handle.changeState(JobHandle.State.QUEUED);
                    } else if (!promise.isDone()) {
                        promise.setFailure(f.cause());
                    }
                }
            });
            promise.addListener(new GenericFutureListener<Promise<T>>(){

                public void operationComplete(Promise<T> p) {
                    if (jobId != null) {
                        AbstractSparkClient.this.jobs.remove(jobId);
                    }
                    if (p.isCancelled() && !rpc.isDone()) {
                        rpc.cancel(true);
                    }
                }
            });
            return handle;
        }

        <T extends Serializable> java.util.concurrent.Future<T> run(Job<T> job) {
            Future<Serializable> rpc = AbstractSparkClient.this.driverRpc.call(new BaseProtocol.SyncJobRequest<T>(job), Serializable.class);
            return rpc;
        }

        void cancel(String jobId) {
            AbstractSparkClient.this.driverRpc.call(new BaseProtocol.CancelJob(jobId));
        }

        java.util.concurrent.Future<?> endSession() {
            return AbstractSparkClient.this.driverRpc.call(new BaseProtocol.EndSession());
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.Error msg) {
            LOG.warn("Error reported from Remote Spark Driver: {}", (Object)msg.cause);
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.JobMetrics msg) {
            JobHandleImpl handle = (JobHandleImpl)AbstractSparkClient.this.jobs.get(msg.jobId);
            if (handle != null) {
                handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
            } else {
                LOG.warn("Received metrics for unknown Spark job {}", (Object)msg.sparkJobId);
            }
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.JobResult msg) {
            JobHandleImpl handle = (JobHandleImpl)AbstractSparkClient.this.jobs.remove(msg.id);
            if (handle != null) {
                LOG.debug("Received result for client job {}", (Object)msg.id);
                handle.setSparkCounters(msg.sparkCounters);
                Throwable error = msg.error;
                if (error == null) {
                    handle.setSuccess(msg.result);
                } else {
                    handle.setFailure(error);
                }
            } else {
                LOG.warn("Received result for unknown client job {}", (Object)msg.id);
            }
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.JobStarted msg) {
            JobHandleImpl handle = (JobHandleImpl)AbstractSparkClient.this.jobs.get(msg.id);
            if (handle != null) {
                handle.changeState(JobHandle.State.STARTED);
            } else {
                LOG.warn("Received event for unknown client job {}", (Object)msg.id);
            }
        }

        private void handle(ChannelHandlerContext ctx, BaseProtocol.JobSubmitted msg) {
            JobHandleImpl handle = (JobHandleImpl)AbstractSparkClient.this.jobs.get(msg.clientJobId);
            if (handle != null) {
                LOG.info("Received Spark job ID: {} for client job {}", (Object)msg.sparkJobId, (Object)msg.clientJobId);
                handle.addSparkJobId(msg.sparkJobId);
            } else {
                LOG.warn("Received Spark job ID: {} for unknown client job {}", (Object)msg.sparkJobId, (Object)msg.clientJobId);
            }
        }

        @Override
        protected String name() {
            return "HiveServer2 to Remote Spark Driver Connection";
        }
    }
}

