package org.apache.zeppelin.flink.internal;

import java.io.BufferedReader;
import java.net.URI;
import org.apache.flink.annotation.Internal;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.zeppelin.flink.FlinkShims;
import org.apache.zeppelin.flink.internal.FlinkShell;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;

/* compiled from: FlinkShell.scala */
/* loaded from: input_file:org/apache/zeppelin/flink/internal/FlinkShell$.class */
public final class FlinkShell$ {
    public static final FlinkShell$ MODULE$ = null;
    private Option<BufferedReader> bufferedReader;

    static {
        new FlinkShell$();
    }

    public Option<BufferedReader> bufferedReader() {
        return this.bufferedReader;
    }

    public void bufferedReader_$eq(Option<BufferedReader> option) {
        this.bufferedReader = option;
    }

    @Internal
    public FlinkShell.YarnConfig ensureYarnConfig(FlinkShell.Config config) {
        FlinkShell.YarnConfig yarnConfig;
        Some yarnConfig2 = config.yarnConfig();
        if (yarnConfig2 instanceof Some) {
            yarnConfig = (FlinkShell.YarnConfig) yarnConfig2.x();
        } else {
            if (!None$.MODULE$.equals(yarnConfig2)) {
                throw new MatchError(yarnConfig2);
            }
            yarnConfig = new FlinkShell.YarnConfig(FlinkShell$YarnConfig$.MODULE$.apply$default$1(), FlinkShell$YarnConfig$.MODULE$.apply$default$2(), FlinkShell$YarnConfig$.MODULE$.apply$default$3(), FlinkShell$YarnConfig$.MODULE$.apply$default$4(), FlinkShell$YarnConfig$.MODULE$.apply$default$5());
        }
        return yarnConfig;
    }

    private String getConfigDir(FlinkShell.Config config) {
        return (String) config.configDir().getOrElse(new FlinkShell$$anonfun$getConfigDir$1());
    }

    @Internal
    public Tuple2<Configuration, Option<ClusterClient<?>>> fetchConnectionInfo(FlinkShell.Config config, Configuration configuration, FlinkShims flinkShims) {
        Tuple2<Configuration, Some<MiniClusterClient>> tuple2;
        Enumeration.Value executionMode = config.executionMode();
        Enumeration.Value LOCAL = FlinkShell$ExecutionMode$.MODULE$.LOCAL();
        if (LOCAL != null ? !LOCAL.equals(executionMode) : executionMode != null) {
            Enumeration.Value REMOTE = FlinkShell$ExecutionMode$.MODULE$.REMOTE();
            if (REMOTE != null ? !REMOTE.equals(executionMode) : executionMode != null) {
                Enumeration.Value YARN = FlinkShell$ExecutionMode$.MODULE$.YARN();
                if (YARN != null ? !YARN.equals(executionMode) : executionMode != null) {
                    Enumeration.Value YARN_APPLICATION = FlinkShell$ExecutionMode$.MODULE$.YARN_APPLICATION();
                    if (YARN_APPLICATION != null ? !YARN_APPLICATION.equals(executionMode) : executionMode != null) {
                        Enumeration.Value KUBERNETES_APPLICATION = FlinkShell$ExecutionMode$.MODULE$.KUBERNETES_APPLICATION();
                        if (KUBERNETES_APPLICATION != null ? !KUBERNETES_APPLICATION.equals(executionMode) : executionMode != null) {
                            Enumeration.Value UNDEFINED = FlinkShell$ExecutionMode$.MODULE$.UNDEFINED();
                            if (UNDEFINED != null ? !UNDEFINED.equals(executionMode) : executionMode != null) {
                                throw new MatchError(executionMode);
                            }
                            throw new IllegalArgumentException("please specify execution mode:\n[local | remote <host> <port> | yarn | yarn-application | kubernetes-application]");
                        }
                        tuple2 = new Tuple2<>(configuration, None$.MODULE$);
                    } else {
                        tuple2 = new Tuple2<>(configuration, None$.MODULE$);
                    }
                } else {
                    tuple2 = createYarnClusterIfNeededAndGetConfig(config, configuration, flinkShims);
                }
            } else {
                tuple2 = createRemoteConfig(config, configuration);
            }
        } else {
            tuple2 = createLocalClusterAndConfig(configuration);
        }
        return tuple2;
    }

    private Tuple2<Configuration, Option<ClusterClient<Nothing$>>> createYarnClusterIfNeededAndGetConfig(FlinkShell.Config config, Configuration configuration, FlinkShims flinkShims) {
        Tuple2<Configuration, Some<ClusterClient<Nothing$>>> tuple2;
        Tuple2<Configuration, None$> fetchDeployedYarnClusterInfo;
        configuration.setBoolean(DeploymentOptions.ATTACHED, true);
        Option<FlinkShell.YarnConfig> yarnConfig = config.yarnConfig();
        if (yarnConfig instanceof Some) {
            tuple2 = deployNewYarnCluster(config, configuration, flinkShims);
        } else {
            if (!None$.MODULE$.equals(yarnConfig)) {
                throw new MatchError(yarnConfig);
            }
            tuple2 = new Tuple2<>(configuration, None$.MODULE$);
        }
        Tuple2<Configuration, Some<ClusterClient<Nothing$>>> tuple22 = tuple2;
        if (tuple22 == null) {
            throw new MatchError(tuple22);
        }
        Tuple2 tuple23 = new Tuple2((Configuration) tuple22._1(), (Option) tuple22._2());
        Configuration configuration2 = (Configuration) tuple23._1();
        Option option = (Option) tuple23._2();
        if (option instanceof Some) {
            fetchDeployedYarnClusterInfo = fetchDeployedYarnClusterInfo(config, configuration2, "yarn-cluster", flinkShims);
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            fetchDeployedYarnClusterInfo = fetchDeployedYarnClusterInfo(config, configuration2, "default", flinkShims);
        }
        Tuple2<Configuration, None$> tuple24 = fetchDeployedYarnClusterInfo;
        if (tuple24 == null) {
            throw new MatchError(tuple24);
        }
        Configuration configuration3 = (Configuration) tuple24._1();
        Predef$.MODULE$.println(new StringBuilder().append("Configuration: ").append(configuration3).toString());
        return new Tuple2<>(configuration3, option);
    }

    private Tuple2<Configuration, Some<ClusterClient<Nothing$>>> deployNewYarnCluster(FlinkShell.Config config, Configuration configuration, FlinkShims flinkShims) {
        Configuration configuration2 = new Configuration(configuration);
        String[] parseArgList = parseArgList(config, "yarn-cluster");
        CliFrontend cliFrontend = new CliFrontend(configuration2, CliFrontend.loadCustomCommandLines(configuration2, getConfigDir(config)));
        Configuration configuration3 = (Configuration) flinkShims.updateEffectiveConfig(cliFrontend, CliFrontendParser.parse(CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), cliFrontend.getCustomCommandLineOptions()), parseArgList, true), configuration2);
        ClusterClientFactory clusterClientFactory = new DefaultClusterClientServiceLoader().getClusterClientFactory(configuration3);
        ClusterDescriptor createClusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration3);
        try {
            ClusterClient clusterClient = createClusterDescriptor.deploySessionCluster(clusterClientFactory.getClusterSpecification(configuration3)).getClusterClient();
            configuration3.set(DeploymentOptions.TARGET, "yarn-session");
            createClusterDescriptor.close();
            return new Tuple2<>(configuration3, new Some(clusterClient));
        } catch (Throwable th) {
            configuration3.set(DeploymentOptions.TARGET, "yarn-session");
            createClusterDescriptor.close();
            throw th;
        }
    }

    private Tuple2<Configuration, None$> fetchDeployedYarnClusterInfo(FlinkShell.Config config, Configuration configuration, String str, FlinkShims flinkShims) {
        Configuration configuration2 = new Configuration(configuration);
        String[] parseArgList = parseArgList(config, str);
        CliFrontend cliFrontend = new CliFrontend(configuration2, CliFrontend.loadCustomCommandLines(configuration2, getConfigDir(config)));
        return new Tuple2<>((Configuration) flinkShims.updateEffectiveConfig(cliFrontend, CliFrontendParser.parse(CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), cliFrontend.getCustomCommandLineOptions()), parseArgList, true), configuration2), None$.MODULE$);
    }

    public String[] parseArgList(FlinkShell.Config config, String str) {
        String[] strArr;
        ArrayBuffer apply = (str != null ? !str.equals("default") : "default" != 0) ? ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"-m", str})) : (ArrayBuffer) ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        Some yarnConfig = config.yarnConfig();
        if (yarnConfig instanceof Some) {
            FlinkShell.YarnConfig yarnConfig2 = (FlinkShell.YarnConfig) yarnConfig.x();
            yarnConfig2.jobManagerMemory().foreach(new FlinkShell$$anonfun$parseArgList$1(apply));
            yarnConfig2.taskManagerMemory().foreach(new FlinkShell$$anonfun$parseArgList$2(apply));
            yarnConfig2.name().foreach(new FlinkShell$$anonfun$parseArgList$3(apply));
            yarnConfig2.queue().foreach(new FlinkShell$$anonfun$parseArgList$4(apply));
            yarnConfig2.slots().foreach(new FlinkShell$$anonfun$parseArgList$5(apply));
            strArr = (String[]) apply.toArray(ClassTag$.MODULE$.apply(String.class));
        } else {
            if (!None$.MODULE$.equals(yarnConfig)) {
                throw new MatchError(yarnConfig);
            }
            strArr = (String[]) apply.toArray(ClassTag$.MODULE$.apply(String.class));
        }
        return strArr;
    }

    private Tuple2<Configuration, None$> createRemoteConfig(FlinkShell.Config config, Configuration configuration) {
        if (config.host().isEmpty() || config.port().isEmpty()) {
            throw new IllegalArgumentException("<host> or <port> is not specified!");
        }
        Configuration configuration2 = new Configuration(configuration);
        setJobManagerInfoToConfig(configuration2, (String) config.host().get(), Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt(config.port().get())));
        configuration2.set(DeploymentOptions.TARGET, "remote");
        configuration2.setBoolean(DeploymentOptions.ATTACHED, true);
        return new Tuple2<>(configuration2, None$.MODULE$);
    }

    private Tuple2<Configuration, Some<MiniClusterClient>> createLocalClusterAndConfig(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.setInteger(JobManagerOptions.PORT, 0);
        MiniCluster createLocalCluster = createLocalCluster(configuration2);
        int port = ((URI) createLocalCluster.getRestAddress().get()).getPort();
        setJobManagerInfoToConfig(configuration2, "localhost", Predef$.MODULE$.int2Integer(port));
        configuration2.set(DeploymentOptions.TARGET, "remote");
        configuration2.setBoolean(DeploymentOptions.ATTACHED, true);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\\nStarting local Flink cluster (host: localhost, port: ", ").\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(port)})));
        return new Tuple2<>(configuration2, new Some(new MiniClusterClient(configuration2, createLocalCluster)));
    }

    private MiniCluster createLocalCluster(Configuration configuration) {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS)).setNumTaskManagers(configuration.getInteger("local.number-taskmanager", 1)).build());
        miniCluster.start();
        return miniCluster;
    }

    private void setJobManagerInfoToConfig(Configuration configuration, String str, Integer num) {
        configuration.setString(JobManagerOptions.ADDRESS, str);
        configuration.setInteger(JobManagerOptions.PORT, Predef$.MODULE$.Integer2int(num));
        configuration.setString(RestOptions.ADDRESS, str);
        configuration.setInteger(RestOptions.PORT, Predef$.MODULE$.Integer2int(num));
    }

    private FlinkShell$() {
        MODULE$ = this;
        this.bufferedReader = None$.MODULE$;
    }
}
