/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.impala.authorization.User;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TPoolConfig;
import org.apache.impala.thrift.TResolveRequestPoolParams;
import org.apache.impala.thrift.TResolveRequestPoolResult;
import org.apache.impala.thrift.TStatus;
import org.apache.impala.util.FileWatchService;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestPoolService {
    static final Logger LOG = LoggerFactory.getLogger(RequestPoolService.class);
    private final AtomicBoolean running_;
    private static final String MAX_PLACED_RESERVATIONS_KEY = "llama.am.throttling.maximum.placed.reservations";
    private static final int MAX_PLACED_RESERVATIONS_DEFAULT = -1;
    private static final String MAX_QUEUED_RESERVATIONS_KEY = "llama.am.throttling.maximum.queued.reservations";
    private static final int MAX_QUEUED_RESERVATIONS_DEFAULT = 200;
    private static final String QUEUE_TIMEOUT_KEY = "impala.admission-control.pool-queue-timeout-ms";
    private static final String QUERY_OPTIONS_KEY = "impala.admission-control.pool-default-query-options";
    private static final String MAX_QUERY_MEM_LIMIT_BYTES = "impala.admission-control.max-query-mem-limit";
    private static final String MIN_QUERY_MEM_LIMIT_BYTES = "impala.admission-control.min-query-mem-limit";
    private static final String CLAMP_MEM_LIMIT_QUERY_OPTION = "impala.admission-control.clamp-mem-limit-query-option";
    private static final String MAX_MT_DOP = "impala.admission-control.max-mt-dop";
    private static final String MAX_QUERY_CPU_CORE_PER_NODE_LIMIT = "impala.admission-control.max-query-cpu-core-per-node-limit";
    private static final String MAX_QUERY_CPU_CORE_COORDINATOR_LIMIT = "impala.admission-control.max-query-cpu-core-coordinator-limit";
    private static final String PER_POOL_CONFIG_KEY_FORMAT = "%s.%s";
    @VisibleForTesting
    final AllocationFileLoaderService allocLoader_;
    private final AtomicReference<AllocationConfiguration> allocationConf_;
    @VisibleForTesting
    final FileWatchService confWatcher_;
    private volatile Configuration conf_;
    private final URL confUrl_;
    private static RequestPoolService single_instance_ = null;

    public static RequestPoolService getInstance(String fsAllocationPath, String sitePath, boolean isTest) {
        if (isTest) {
            return new RequestPoolService(fsAllocationPath, sitePath);
        }
        if (single_instance_ == null) {
            single_instance_ = new RequestPoolService(fsAllocationPath, sitePath);
        }
        return single_instance_;
    }

    public static RequestPoolService getInstance() {
        if (single_instance_ == null) {
            LOG.info("Default pool only, scheduler allocation is not specified.");
        }
        return single_instance_;
    }

    private RequestPoolService(String fsAllocationPath, String sitePath) {
        Preconditions.checkNotNull((Object)fsAllocationPath);
        this.running_ = new AtomicBoolean(false);
        this.allocationConf_ = new AtomicReference();
        URL fsAllocationURL = RequestPoolService.getURL(fsAllocationPath);
        if (fsAllocationURL == null) {
            throw new IllegalArgumentException("Unable to find allocation configuration file: " + fsAllocationPath);
        }
        Configuration allocConf = new Configuration();
        allocConf.set("yarn.scheduler.fair.allocation.file", fsAllocationURL.getPath());
        this.allocLoader_ = new AllocationFileLoaderService();
        this.allocLoader_.init(allocConf);
        if (!Strings.isNullOrEmpty((String)sitePath)) {
            this.confUrl_ = RequestPoolService.getURL(sitePath);
            if (this.confUrl_ == null) {
                throw new IllegalArgumentException("Unable to find configuration file: " + sitePath);
            }
            this.conf_ = new Configuration(false);
            this.conf_.addResource(this.confUrl_);
            this.confWatcher_ = new FileWatchService(new File(this.confUrl_.getPath()), new ConfWatcher());
        } else {
            this.confWatcher_ = null;
            this.confUrl_ = null;
        }
    }

    @VisibleForTesting
    private static URL getURL(String path) {
        Preconditions.checkNotNull((Object)path);
        File file = new File(path);
        file = file.getAbsoluteFile();
        if (!file.exists()) {
            LOG.error("Unable to find specified file: " + path);
            return null;
        }
        try {
            return file.toURI().toURL();
        }
        catch (MalformedURLException ex) {
            LOG.error("Unable to construct URL for file: " + path, (Throwable)ex);
            return null;
        }
    }

    public void start() {
        Preconditions.checkState((!this.running_.get() ? 1 : 0) != 0);
        this.allocLoader_.setReloadListener(this.allocationConf_::set);
        this.allocLoader_.start();
        try {
            this.allocLoader_.reloadAllocations();
        }
        catch (Exception ex) {
            try {
                this.stopInternal();
            }
            catch (Exception stopEx) {
                LOG.error("Unable to stop AllocationFileLoaderService after failed start.", (Throwable)stopEx);
            }
            throw new RuntimeException(ex);
        }
        if (this.confWatcher_ != null) {
            this.confWatcher_.start();
        }
        this.running_.set(true);
    }

    public void stop() {
        Preconditions.checkState((boolean)this.running_.get());
        this.stopInternal();
        if (single_instance_ == this) {
            single_instance_ = null;
        }
    }

    public boolean isRunning() {
        return this.running_.get();
    }

    private void stopInternal() {
        this.running_.set(false);
        if (this.confWatcher_ != null) {
            this.confWatcher_.stop();
        }
        this.allocLoader_.stop();
    }

    public TResolveRequestPoolResult resolveRequestPool(TResolveRequestPoolParams resolvePoolParams) throws InternalException {
        Preconditions.checkState((boolean)this.running_.get());
        String requestedPool = resolvePoolParams.getRequested_pool();
        String user = resolvePoolParams.getUser();
        TResolveRequestPoolResult result = new TResolveRequestPoolResult();
        String errorMessage = null;
        String pool = null;
        try {
            pool = this.assignToPool(requestedPool, user);
        }
        catch (IOException ex) {
            errorMessage = ex.getMessage();
            if (errorMessage.startsWith("No groups found for user")) {
                errorMessage = String.format("Failed to resolve user '%s' to a pool while evaluating the 'primaryGroup' or 'secondaryGroup' queue placement rules because no groups were found for the user. This is likely because the user does not exist on the local operating system.", resolvePoolParams.getUser());
            }
            LOG.warn(String.format("Error assigning to pool. requested='%s', user='%s', msg=%s", requestedPool, user, errorMessage), (Throwable)ex);
        }
        if (pool == null) {
            if (errorMessage == null) {
                result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
            } else {
                result.setStatus(new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList((Object[])new String[]{errorMessage})));
            }
        } else {
            result.setResolved_pool(pool);
            result.setHas_access(this.hasAccess(pool, user));
            result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}", new Object[]{resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(), result.resolved_pool, result.has_access});
        }
        return result;
    }

    public TPoolConfig getPoolConfig(String pool) {
        Preconditions.checkState((boolean)this.running_.get());
        TPoolConfig result = new TPoolConfig();
        long maxMemoryMb = this.allocationConf_.get().getMaxResources(pool).getMemory();
        result.setMax_mem_resources(maxMemoryMb == Integer.MAX_VALUE ? -1L : maxMemoryMb * 0x100000L);
        Map userQueryLimits = this.allocationConf_.get().getUserQueryLimits(pool);
        result.setUser_query_limits(userQueryLimits);
        Map groupQueryLimits = this.allocationConf_.get().getGroupQueryLimits(pool);
        result.setGroup_query_limits(groupQueryLimits);
        if (this.conf_ == null) {
            result.setMax_requests(-1L);
            result.setMax_queued(200L);
            result.setDefault_query_options("");
            result.setOnly_coordinators(false);
        } else {
            Configuration currentConf = this.conf_;
            result.setMax_requests(this.getPoolConfigValue(currentConf, pool, MAX_PLACED_RESERVATIONS_KEY, -1L));
            result.setMax_queued(this.getPoolConfigValue(currentConf, pool, MAX_QUEUED_RESERVATIONS_KEY, 200L));
            long queueTimeoutMs = this.getPoolConfigValue(currentConf, pool, QUEUE_TIMEOUT_KEY, -1L);
            if (queueTimeoutMs > 0L) {
                result.setQueue_timeout_ms(queueTimeoutMs);
            }
            result.setDefault_query_options(this.getPoolConfigValue(currentConf, pool, QUERY_OPTIONS_KEY, ""));
            result.setMax_query_mem_limit(this.getPoolConfigValue(currentConf, pool, MAX_QUERY_MEM_LIMIT_BYTES, 0L));
            result.setMin_query_mem_limit(this.getPoolConfigValue(currentConf, pool, MIN_QUERY_MEM_LIMIT_BYTES, 0L));
            result.setClamp_mem_limit_query_option(this.getPoolConfigValue(currentConf, pool, CLAMP_MEM_LIMIT_QUERY_OPTION, true));
            result.setMax_mt_dop(this.getPoolConfigValue(currentConf, pool, MAX_MT_DOP, -1L));
            result.setMax_query_cpu_core_per_node_limit(this.getPoolConfigValue(currentConf, pool, MAX_QUERY_CPU_CORE_PER_NODE_LIMIT, 0L));
            result.setMax_query_cpu_core_coordinator_limit(this.getPoolConfigValue(currentConf, pool, MAX_QUERY_CPU_CORE_COORDINATOR_LIMIT, 0L));
            result.setOnly_coordinators(this.allocationConf_.get().isOnlyCoordinators(pool));
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("getPoolConfig(pool={}): max_mem_resources={}, max_requests={}, max_queued={},  queue_timeout_ms={}, default_query_options={}, max_query_mem_limit={}, min_query_mem_limit={}, clamp_mem_limit_query_option={}, max_query_cpu_core_per_node_limit={}, max_query_cpu_core_coordinator_limit={} user_query_limits={} group_query_limits={}", new Object[]{pool, result.max_mem_resources, result.max_requests, result.max_queued, result.queue_timeout_ms, result.default_query_options, result.max_query_mem_limit, result.min_query_mem_limit, result.clamp_mem_limit_query_option, result.max_query_cpu_core_per_node_limit, result.max_query_cpu_core_coordinator_limit, result.user_query_limits, result.group_query_limits});
        }
        return result;
    }

    private long getPoolConfigValue(Configuration conf, String pool, String key, long defaultValue) {
        return conf.getLong(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), conf.getLong(key, defaultValue));
    }

    private String getPoolConfigValue(Configuration conf, String pool, String key, String defaultValue) {
        return conf.get(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), conf.get(key, defaultValue));
    }

    private boolean getPoolConfigValue(Configuration conf, String pool, String key, boolean defaultValue) {
        return conf.getBoolean(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), conf.getBoolean(key, defaultValue));
    }

    @VisibleForTesting
    String assignToPool(String requestedPool, String user) throws InternalException, IOException {
        Preconditions.checkState((boolean)this.running_.get());
        Preconditions.checkNotNull((Object)requestedPool);
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)user) ? 1 : 0) != 0);
        String shortName = new User(user).getShortName();
        return this.allocationConf_.get().getPlacementPolicy().assignAppToQueue(requestedPool.isEmpty() ? "default" : requestedPool, shortName);
    }

    @VisibleForTesting
    boolean hasAccess(String pool, String user) throws InternalException {
        Preconditions.checkState((boolean)this.running_.get());
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)pool) ? 1 : 0) != 0);
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)user) ? 1 : 0) != 0);
        User requestingUser = new User(user);
        String shortName = requestingUser.getShortName();
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)shortName);
        return this.allocationConf_.get().hasAccess(pool, QueueACL.SUBMIT_APPLICATIONS, ugi);
    }

    @VisibleForTesting
    AllocationConfiguration getAllocationConfig() {
        Preconditions.checkState((boolean)RuntimeEnv.INSTANCE.isTestEnv());
        return this.allocationConf_.get();
    }

    private final class ConfWatcher
    implements FileWatchService.FileChangeListener {
        private ConfWatcher() {
        }

        @Override
        public void onFileChange() {
            Preconditions.checkNotNull((Object)RequestPoolService.this.confUrl_);
            LOG.info("Loading configuration: " + RequestPoolService.this.confUrl_.getFile());
            Configuration conf = new Configuration();
            conf.addResource(RequestPoolService.this.confUrl_);
            RequestPoolService.this.conf_ = conf;
        }
    }
}

