/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import java.io.File;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Assert;
import org.junit.Test;

public class TestLocalHiveSparkClient {
    private final CyclicBarrier barrier = new CyclicBarrier(2);

    @Test
    public void testMultiSessionSparkContextReUse() throws MalformedURLException {
        String confDir = "../data/conf/spark/local/hive-site.xml";
        HiveConf.setHiveSiteLocation((URL)new File(confDir).toURI().toURL());
        ExecutorService executor = Executors.newFixedThreadPool(this.barrier.getParties());
        List<CompletableFuture> futures = IntStream.range(0, this.barrier.getParties()).boxed().map(i -> CompletableFuture.supplyAsync(() -> this.execute((Integer)i), executor)).collect(Collectors.toList());
        futures.forEach(CompletableFuture::join);
    }

    private Void execute(Integer threadId) {
        HiveConf conf = new HiveConf();
        conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
        conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestLocalHiveSparkClient-testMultiSessionSparkContextReuse-local-dir").toString());
        SessionState.start((HiveConf)conf);
        try {
            this.runSparkTestSession(conf, threadId);
        }
        catch (Exception ex) {
            Assert.fail((String)ex.getMessage());
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runSparkTestSession(HiveConf conf, int threadId) throws Exception {
        conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "10s");
        conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s");
        Driver driver = null;
        try {
            driver = new Driver(new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build(), null);
            SparkSession sparkSession = SparkUtilities.getSparkSession((HiveConf)conf, (SparkSessionManager)SparkSessionManagerImpl.getInstance());
            driver.run("show tables");
            this.barrier.await();
            SparkContext sparkContext = this.getSparkContext(sparkSession);
            Assert.assertFalse((boolean)sparkContext.isStopped());
            if (threadId == 1) {
                this.barrier.await();
                this.closeSparkSession(sparkSession);
                Assert.assertTrue((boolean)sparkContext.isStopped());
            } else {
                this.closeSparkSession(sparkSession);
                Assert.assertFalse((boolean)sparkContext.isStopped());
                this.barrier.await();
            }
        }
        finally {
            if (driver != null) {
                driver.destroy();
            }
        }
    }

    private void closeSparkSession(SparkSession session) throws ReflectiveOperationException {
        Assert.assertTrue((boolean)session.isOpen());
        session.close();
        Assert.assertFalse((boolean)session.isOpen());
    }

    private SparkContext getSparkContext(SparkSession sparkSession) throws ReflectiveOperationException {
        HiveSparkClient sparkClient = this.getSparkClient(sparkSession);
        Assert.assertNotNull((Object)sparkClient);
        return this.getSparkContext(sparkClient).sc();
    }

    private JavaSparkContext getSparkContext(HiveSparkClient sparkClient) throws ReflectiveOperationException {
        Field sparkContextField = LocalHiveSparkClient.class.getDeclaredField("sc");
        sparkContextField.setAccessible(true);
        return (JavaSparkContext)sparkContextField.get(sparkClient);
    }

    private HiveSparkClient getSparkClient(SparkSession sparkSession) throws ReflectiveOperationException {
        Field clientField = SparkSessionImpl.class.getDeclaredField("hiveSparkClient");
        clientField.setAccessible(true);
        return (HiveSparkClient)clientField.get(sparkSession);
    }
}

