/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.spark.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.client.AbstractSparkClient;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.apache.spark.launcher.AbstractLauncher;
import org.apache.spark.launcher.InProcessLauncher;
import org.apache.spark.launcher.SparkAppHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkLauncherSparkClient
extends AbstractSparkClient {
    private static final Logger LOG = LoggerFactory.getLogger((String)SparkLauncherSparkClient.class.getName());
    private static final long serialVersionUID = 2153000661341457380L;
    private static final Set<SparkAppHandle.State> FAILED_SPARK_STATES = Sets.newHashSet((Object[])new SparkAppHandle.State[]{SparkAppHandle.State.FAILED, SparkAppHandle.State.KILLED, SparkAppHandle.State.LOST});
    private transient AbstractLauncher<InProcessLauncher> sparkLauncher;

    SparkLauncherSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, String sessionid) throws IOException {
        super(rpcServer, conf, hiveConf, sessionid);
    }

    @Override
    protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String clientId) throws IOException {
        if (isTesting != null) {
            System.setProperty("spark.testing", "true");
        }
        if (isTesting == null) {
            Preconditions.checkArgument((boolean)SparkClientUtilities.isYarnClusterMode((String)this.conf.get("spark.master"), (String)this.conf.get("spark.submit.deployMode")), (Object)(this.getClass().getName() + " is only supported in yarn-cluster mode"));
        }
        CountDownLatch shutdownLatch = new CountDownLatch(1);
        SparkAppHandle sparkAppHandle = this.getSparkLauncher().startApplication(new SparkAppHandle.Listener[]{new SparkAppListener(shutdownLatch, rpcServer, clientId)});
        return SparkLauncherSparkClient.createSparkLauncherFuture(shutdownLatch, sparkAppHandle, rpcServer, clientId);
    }

    @VisibleForTesting
    static Future<Void> createSparkLauncherFuture(CountDownLatch shutdownLatch, SparkAppHandle sparkAppHandle, RpcServer rpcServer, String clientId) {
        Callable<Void> runnable = () -> {
            try {
                shutdownLatch.await();
            }
            catch (InterruptedException e) {
                rpcServer.cancelClient(clientId, "Spark app launcher interrupted");
                sparkAppHandle.stop();
            }
            return null;
        };
        FutureTask<Void> futureTask = new FutureTask<Void>(runnable);
        Thread driverThread = new Thread(futureTask);
        driverThread.setDaemon(true);
        driverThread.setName("SparkLauncherMonitor");
        driverThread.start();
        return futureTask;
    }

    @Override
    protected String getSparkHome() {
        return null;
    }

    @Override
    protected void addAppArg(String arg) {
        this.getSparkLauncher().addAppArgs(new String[]{arg});
    }

    @Override
    protected void addExecutableJar(String jar) {
        this.getSparkLauncher().setAppResource(jar);
    }

    @Override
    protected void addPropertiesFile(String absolutePath) {
        this.getSparkLauncher().setPropertiesFile(absolutePath);
    }

    @Override
    protected void addClass(String name) {
        this.getSparkLauncher().setMainClass(name);
    }

    @Override
    protected void addJars(String jars) {
        this.getSparkLauncher().addJar(jars);
    }

    @Override
    protected void addProxyUser(String proxyUser) {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void addNumExecutors(String numOfExecutors) {
        this.getSparkLauncher().addSparkArg("--num-executors", numOfExecutors);
    }

    @Override
    protected void addExecutorMemory(String executorMemory) {
        this.getSparkLauncher().addSparkArg("--executor-memory", executorMemory);
    }

    @Override
    protected void addExecutorCores(String executorCores) {
        this.getSparkLauncher().addSparkArg("--executor-cores", executorCores);
    }

    private AbstractLauncher<InProcessLauncher> getSparkLauncher() {
        if (this.sparkLauncher == null) {
            this.sparkLauncher = new InProcessLauncher();
        }
        return this.sparkLauncher;
    }

    @VisibleForTesting
    static final class SparkAppListener
    implements SparkAppHandle.Listener {
        private final CountDownLatch shutdownLatch;
        private final RpcServer rpcServer;
        private final String clientId;

        SparkAppListener(CountDownLatch shutdownLatch, RpcServer rpcServer, String clientId) {
            this.shutdownLatch = shutdownLatch;
            this.rpcServer = rpcServer;
            this.clientId = clientId;
        }

        public void stateChanged(SparkAppHandle sparkAppHandle) {
            LOG.info("Spark app transitioned to state = " + sparkAppHandle.getState());
            if (sparkAppHandle.getState().isFinal() || sparkAppHandle.getState().equals((Object)SparkAppHandle.State.RUNNING)) {
                this.shutdownLatch.countDown();
                sparkAppHandle.disconnect();
                LOG.info("Successfully disconnected from Spark app handle");
            }
            if (FAILED_SPARK_STATES.contains(sparkAppHandle.getState())) {
                this.rpcServer.cancelClient(this.clientId, "Spark app launcher failed, transitioned to state " + sparkAppHandle.getState());
            }
        }

        public void infoChanged(SparkAppHandle sparkAppHandle) {
        }
    }
}

