/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.TaskQueue;
import org.apache.hadoop.hive.ql.exec.DagUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.HiveVoidFunction;
import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer;
import org.apache.hadoop.hive.ql.exec.spark.SparkPlan;
import org.apache.hadoop.hive.ql.exec.spark.SparkPlanGenerator;
import org.apache.hadoop.hive.ql.exec.spark.SparkReporter;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.NullScanFileSystem;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.spark.client.Job;
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientFactory;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteHiveSparkClient
implements HiveSparkClient {
    private static final long serialVersionUID = 1L;
    private static final String MR_JAR_PROPERTY = "tmpjars";
    private static final String MR_CREDENTIALS_LOCATION_PROPERTY = "mapreduce.job.credentials.binary";
    private static final transient Logger LOG = LoggerFactory.getLogger(RemoteHiveSparkClient.class);
    private static final transient Splitter CSV_SPLITTER = Splitter.on((String)",").omitEmptyStrings();
    private transient Map<String, String> conf;
    private transient SparkClient remoteClient;
    private transient SparkConf sparkConf;
    private transient HiveConf hiveConf;
    private transient List<URI> localJars = new ArrayList<URI>();
    private transient List<URI> localFiles = new ArrayList<URI>();
    private final transient long sparkClientTimtout;
    private final String sessionId;

    RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf, String sessionId) throws Exception {
        this.hiveConf = hiveConf;
        this.sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
        this.sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
        this.conf = conf;
        this.sessionId = sessionId;
        this.createRemoteClient();
    }

    private void createRemoteClient() throws Exception {
        this.remoteClient = SparkClientFactory.createClient(this.conf, (HiveConf)this.hiveConf, (String)this.sessionId);
        if (HiveConf.getBoolVar((Configuration)this.hiveConf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_PREWARM_ENABLED) && (SparkClientUtilities.isYarnMaster((String)this.hiveConf.get("spark.master")) || SparkClientUtilities.isLocalMaster((String)this.hiveConf.get("spark.master")))) {
            int minExecutors = this.getExecutorsToWarm();
            if (minExecutors <= 0) {
                return;
            }
            LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors);
            int curExecutors = 0;
            long maxPrewarmTime = HiveConf.getTimeVar((Configuration)this.hiveConf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_PREWARM_SPARK_TIMEOUT, (TimeUnit)TimeUnit.MILLISECONDS);
            long ts = System.currentTimeMillis();
            do {
                try {
                    curExecutors = this.getExecutorCount(maxPrewarmTime, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e) {
                    LOG.warn("Timed out getting executor count.", (Throwable)e);
                }
                if (curExecutors >= minExecutors) {
                    LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors);
                    return;
                }
                Thread.sleep(500L);
            } while (System.currentTimeMillis() - ts < maxPrewarmTime);
            LOG.info("Timeout (" + maxPrewarmTime / 1000L + "s) occurred while prewarming executors. The current number of executors is " + curExecutors);
        }
    }

    private int getExecutorsToWarm() {
        int minExecutors = HiveConf.getIntVar((Configuration)this.hiveConf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
        boolean dynamicAllocation = this.hiveConf.getBoolean("spark.dynamicAllocation.enabled", false);
        if (dynamicAllocation) {
            int min = this.sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0);
            int initExecutors = this.sparkConf.getInt("spark.dynamicAllocation.initialExecutors", min);
            minExecutors = Math.min(minExecutors, initExecutors);
        } else {
            int execInstances = this.sparkConf.getInt("spark.executor.instances", 2);
            minExecutors = Math.min(minExecutors, execInstances);
        }
        return minExecutors;
    }

    private int getExecutorCount(long timeout, TimeUnit unit) throws Exception {
        Future handler = this.remoteClient.getExecutorCount();
        return (Integer)handler.get(timeout, unit);
    }

    @Override
    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    @Override
    public int getExecutorCount() throws Exception {
        return this.getExecutorCount(this.sparkClientTimtout, TimeUnit.SECONDS);
    }

    @Override
    public int getDefaultParallelism() throws Exception {
        Future handler = this.remoteClient.getDefaultParallelism();
        return (Integer)handler.get(this.sparkClientTimtout, TimeUnit.SECONDS);
    }

    @Override
    public SparkJobRef execute(TaskQueue taskQueue, Context context, SparkWork sparkWork) throws Exception {
        if (SparkClientUtilities.isYarnMaster((String)this.hiveConf.get("spark.master")) && !this.remoteClient.isActive()) {
            this.close();
            this.createRemoteClient();
        }
        try {
            return this.submit(taskQueue, context, sparkWork);
        }
        catch (Throwable cause) {
            throw new Exception("Failed to submit Spark work, please retry later", cause);
        }
    }

    private SparkJobRef submit(TaskQueue taskQueue, Context context, SparkWork sparkWork) throws Exception {
        HiveConf hiveConf = (HiveConf)context.getConf();
        this.refreshLocalResources(sparkWork, hiveConf);
        JobConf jobConf = new JobConf((Configuration)hiveConf);
        HiveConfUtil.updateJobCredentialProviders((Configuration)jobConf);
        Path emptyScratchDir = context.getMRTmpPath();
        FileSystem fs = emptyScratchDir.getFileSystem((Configuration)jobConf);
        fs.mkdirs(emptyScratchDir);
        jobConf.set("fs." + NullScanFileSystem.getBaseScheme() + ".impl", NullScanFileSystem.class.getCanonicalName());
        byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
        byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
        byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
        JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes);
        if (taskQueue.isShutdown()) {
            throw new HiveException("Operation is cancelled.");
        }
        JobHandle jobHandle = this.remoteClient.submit((Job)job);
        RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(this.remoteClient, (JobHandle<Serializable>)jobHandle, this.sparkClientTimtout);
        return new RemoteSparkJobRef(hiveConf, (JobHandle<Serializable>)jobHandle, sparkJobStatus);
    }

    private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException {
        this.addJars(new JobConf(this.getClass()).getJar());
        this.addJars(conf.getAuxJars());
        this.addJars(SessionState.get() == null ? null : SessionState.get().getReloadableAuxJars());
        String addedJars = Utilities.getResourceFiles((Configuration)conf, SessionState.ResourceType.JAR);
        HiveConf.setVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVEADDEDJARS, (String)addedJars);
        this.addJars(addedJars);
        JobConf jobConf = new JobConf((Configuration)conf);
        jobConf.set(MR_JAR_PROPERTY, "");
        for (BaseWork work : sparkWork.getAllWork()) {
            work.configureJobConf(jobConf);
        }
        this.addJars(jobConf.get(MR_JAR_PROPERTY));
        conf.unset(MR_CREDENTIALS_LOCATION_PROPERTY);
        String addedFiles = Utilities.getResourceFiles((Configuration)conf, SessionState.ResourceType.FILE);
        HiveConf.setVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVEADDEDFILES, (String)addedFiles);
        this.addResources(addedFiles);
        String addedArchives = Utilities.getResourceFiles((Configuration)conf, SessionState.ResourceType.ARCHIVE);
        HiveConf.setVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVEADDEDARCHIVES, (String)addedArchives);
        this.addResources(addedArchives);
    }

    private void addResources(String addedFiles) throws IOException {
        for (String addedFile : CSV_SPLITTER.split((CharSequence)Strings.nullToEmpty((String)addedFiles))) {
            try {
                URI fileUri = FileUtils.getURI((String)addedFile);
                if (fileUri == null || this.localFiles.contains(fileUri)) continue;
                this.localFiles.add(fileUri);
                if (SparkUtilities.needUploadToHDFS(fileUri, this.sparkConf)) {
                    fileUri = SparkUtilities.uploadToHDFS(fileUri, this.hiveConf);
                }
                this.remoteClient.addFile(fileUri);
            }
            catch (URISyntaxException e) {
                LOG.warn("Failed to add file:" + addedFile, (Throwable)e);
            }
        }
    }

    private void addJars(String addedJars) throws IOException {
        for (String addedJar : CSV_SPLITTER.split((CharSequence)Strings.nullToEmpty((String)addedJars))) {
            try {
                URI jarUri = FileUtils.getURI((String)addedJar);
                if (jarUri == null || this.localJars.contains(jarUri)) continue;
                this.localJars.add(jarUri);
                if (SparkUtilities.needUploadToHDFS(jarUri, this.sparkConf)) {
                    jarUri = SparkUtilities.uploadToHDFS(jarUri, this.hiveConf);
                }
                this.remoteClient.addJar(jarUri);
            }
            catch (URISyntaxException e) {
                LOG.warn("Failed to add jar:" + addedJar, (Throwable)e);
            }
        }
    }

    @Override
    public void close() {
        if (this.remoteClient != null) {
            this.remoteClient.stop();
        }
        this.localFiles.clear();
        this.localJars.clear();
    }

    @VisibleForTesting
    static class JobStatusJob
    implements Job<Serializable> {
        private static final long serialVersionUID = 1L;
        private final byte[] jobConfBytes;
        private final byte[] scratchDirBytes;
        private final byte[] sparkWorkBytes;

        private JobStatusJob() {
            this(null, null, null);
        }

        JobStatusJob(byte[] jobConfBytes, byte[] scratchDirBytes, byte[] sparkWorkBytes) {
            this.jobConfBytes = jobConfBytes;
            this.scratchDirBytes = scratchDirBytes;
            this.sparkWorkBytes = sparkWorkBytes;
        }

        public Serializable call(JobContext jc) throws Exception {
            JobConf localJobConf = KryoSerializer.deserializeJobConf(this.jobConfBytes);
            Map addedJars = jc.getAddedJars();
            if (addedJars != null && !addedJars.isEmpty()) {
                List localAddedJars = SparkClientUtilities.addToClassPath((Map)addedJars, (Configuration)localJobConf, (File)jc.getLocalTmpDir());
                localJobConf.set("hive.added.jars", StringUtils.join((Iterable)localAddedJars, (String)";"));
            }
            Path localScratchDir = KryoSerializer.deserialize(this.scratchDirBytes, Path.class);
            SparkWork localSparkWork = KryoSerializer.deserialize(this.sparkWorkBytes, SparkWork.class);
            this.logConfigurations(localJobConf);
            SparkCounters sparkCounters = new SparkCounters(jc.sc());
            Map<String, List<String>> prefixes = localSparkWork.getRequiredCounterPrefix();
            if (prefixes != null) {
                for (String group : prefixes.keySet()) {
                    for (String counterName : prefixes.get(group)) {
                        sparkCounters.createCounter(group, counterName);
                    }
                }
            }
            SparkReporter sparkReporter = new SparkReporter(sparkCounters);
            SparkPlanGenerator gen = new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter);
            SparkPlan plan = gen.generate(localSparkWork);
            jc.sc().setJobGroup("queryId = " + localSparkWork.getQueryId(), DagUtils.getQueryName((Configuration)localJobConf));
            JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
            JavaFutureAction future = finalRDD.foreachAsync((VoidFunction)HiveVoidFunction.getInstance());
            jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
            return null;
        }

        private void logConfigurations(JobConf localJobConf) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Logging job configuration: ");
                StringBuilder outWriter = new StringBuilder();
                HiveConfUtil.dumpConfig((Configuration)localJobConf, (StringBuilder)outWriter);
                LOG.debug(outWriter.toString());
            }
        }
    }
}

