/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord.autoscaling;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.AbstractWorkerProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.AutoScalingData;
import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig;
import org.apache.druid.indexing.overlord.autoscaling.Provisioner;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningUtil;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerProvisioningStrategy {
    private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class);
    private static final String SCHEME = "http";
    private final PendingTaskBasedWorkerProvisioningConfig config;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;

    @Nullable
    static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(Supplier<WorkerBehaviorConfig> workerConfigRef, String action, EmittingLogger log) {
        WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig)workerConfigRef.get();
        if (workerBehaviorConfig == null) {
            log.error("No workerConfig available, cannot %s workers.", new Object[]{action});
            return null;
        }
        if (!(workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig)) {
            log.error("Only DefaultWorkerBehaviorConfig is supported as WorkerBehaviorConfig, [%s] given, cannot %s workers", new Object[]{workerBehaviorConfig, action});
            return null;
        }
        DefaultWorkerBehaviorConfig workerConfig = (DefaultWorkerBehaviorConfig)workerBehaviorConfig;
        if (workerConfig.getAutoScaler() == null) {
            log.error("No autoScaler available, cannot %s workers", new Object[]{action});
            return null;
        }
        return workerConfig;
    }

    @Inject
    public PendingTaskBasedWorkerProvisioningStrategy(PendingTaskBasedWorkerProvisioningConfig config, Supplier<WorkerBehaviorConfig> workerConfigRef, ProvisioningSchedulerConfig provisioningSchedulerConfig) {
        this(config, workerConfigRef, provisioningSchedulerConfig, new Supplier<ScheduledExecutorService>(){

            public ScheduledExecutorService get() {
                return ScheduledExecutors.fixed((int)1, (String)"PendingTaskBasedWorkerProvisioning-manager--%d");
            }
        });
    }

    public PendingTaskBasedWorkerProvisioningStrategy(PendingTaskBasedWorkerProvisioningConfig config, Supplier<WorkerBehaviorConfig> workerConfigRef, ProvisioningSchedulerConfig provisioningSchedulerConfig, Supplier<ScheduledExecutorService> execFactory) {
        super(provisioningSchedulerConfig, execFactory);
        this.config = config;
        this.workerConfigRef = workerConfigRef;
    }

    @Override
    public Provisioner makeProvisioner(WorkerTaskRunner runner) {
        return new PendingProvisioner(runner);
    }

    private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> zkWorkers, DefaultWorkerBehaviorConfig workerConfig) {
        int currValidWorkers = this.getCurrValidWorkers(zkWorkers);
        int invalidWorkers = zkWorkers.size() - currValidWorkers;
        int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers();
        log.info("Min workers: %d", new Object[]{minWorkers});
        return invalidWorkers + Math.max(0, Math.min(this.config.getMaxScalingStep(), currValidWorkers - minWorkers));
    }

    private int getCurrValidWorkers(Collection<ImmutableWorkerInfo> workers) {
        Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(this.config);
        int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
        log.debug("Current valid workers: %d", new Object[]{currValidWorkers});
        return currValidWorkers;
    }

    private static int getExpectedWorkerCapacity(Collection<ImmutableWorkerInfo> workers) {
        int size = workers.size();
        if (size == 0) {
            return 1;
        }
        return workers.iterator().next().getWorker().getCapacity();
    }

    private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task) {
        return new ImmutableWorkerInfo(immutableWorker.getWorker(), immutableWorker.getCurrCapacityUsed() + 1, (Set<String>)Sets.union(immutableWorker.getAvailabilityGroups(), (Set)Sets.newHashSet((Object[])new String[]{task.getTaskResource().getAvailabilityGroup()})), (Collection<String>)Sets.union(immutableWorker.getRunningTasks(), (Set)Sets.newHashSet((Object[])new String[]{task.getId()})), DateTimes.nowUtc());
    }

    private static ImmutableWorkerInfo createDummyWorker(String scheme, String host, int capacity, String version) {
        return new ImmutableWorkerInfo(new Worker(scheme, host, "-2", capacity, version, "_default_worker_category"), 0, new HashSet<String>(), new HashSet<String>(), DateTimes.nowUtc());
    }

    private class PendingProvisioner
    implements Provisioner {
        private final WorkerTaskRunner runner;
        private final ScalingStats scalingStats;
        private final Set<String> currentlyProvisioning;
        private final Set<String> currentlyTerminating;
        private DateTime lastProvisionTime;
        private DateTime lastTerminateTime;

        private PendingProvisioner(WorkerTaskRunner runner) {
            this.scalingStats = new ScalingStats(PendingTaskBasedWorkerProvisioningStrategy.this.config.getNumEventsToTrack());
            this.currentlyProvisioning = new HashSet<String>();
            this.currentlyTerminating = new HashSet<String>();
            this.lastTerminateTime = this.lastProvisionTime = DateTimes.nowUtc();
            this.runner = runner;
        }

        @Override
        public synchronized boolean doProvision() {
            Collection<Task> pendingTasks = this.runner.getPendingTaskPayloads();
            log.debug("Pending tasks: %d %s", new Object[]{pendingTasks.size(), pendingTasks});
            Collection<ImmutableWorkerInfo> workers = this.runner.getWorkers();
            log.debug("Workers: %d %s", new Object[]{workers.size(), workers});
            boolean didProvision = false;
            DefaultWorkerBehaviorConfig workerConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig((Supplier<WorkerBehaviorConfig>)PendingTaskBasedWorkerProvisioningStrategy.this.workerConfigRef, "provision", log);
            if (workerConfig == null) {
                return false;
            }
            Collection<String> workerNodeIds = this.getWorkerNodeIDs(Collections2.transform(workers, (Function)new Function<ImmutableWorkerInfo, Worker>(){

                public Worker apply(ImmutableWorkerInfo input) {
                    return input.getWorker();
                }
            }), workerConfig);
            log.info("Currently provisioning: %d %s", new Object[]{this.currentlyProvisioning.size(), this.currentlyProvisioning});
            this.currentlyProvisioning.removeAll(workerNodeIds);
            log.debug("Currently provisioning without WorkerNodeIds: %d %s", new Object[]{this.currentlyProvisioning.size(), this.currentlyProvisioning});
            if (this.currentlyProvisioning.isEmpty()) {
                AutoScalingData provisioned;
                int workersToProvision;
                log.info("Workers to provision: %d", new Object[]{workersToProvision});
                for (workersToProvision = this.getScaleUpNodeCount(this.runner.getConfig(), workerConfig, pendingTasks, workers); workersToProvision > 0; workersToProvision -= provisioned.getNodeIds().size()) {
                    List<String> newNodes;
                    provisioned = workerConfig.getAutoScaler().provision();
                    if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
                        log.warn("NewNodes is empty, returning from provision loop", new Object[0]);
                        break;
                    }
                    log.info("Provisioned: %d [%s]", new Object[]{provisioned.getNodeIds().size(), provisioned.getNodeIds()});
                    this.currentlyProvisioning.addAll(newNodes);
                    this.lastProvisionTime = DateTimes.nowUtc();
                    this.scalingStats.addProvisionEvent(provisioned);
                    didProvision = true;
                }
            } else {
                Duration durSinceLastProvision = new Duration((ReadableInstant)this.lastProvisionTime, (ReadableInstant)DateTimes.nowUtc());
                log.info("%s provisioning. Current wait time: %s", new Object[]{this.currentlyProvisioning, durSinceLastProvision});
                if (durSinceLastProvision.isLongerThan((ReadableDuration)PendingTaskBasedWorkerProvisioningStrategy.this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node provisioning taking too long!", new Object[0]).addData("millisSinceLastProvision", (Object)durSinceLastProvision.getMillis()).addData("provisioningCount", (Object)this.currentlyProvisioning.size()).emit();
                    workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(this.currentlyProvisioning));
                    this.currentlyProvisioning.clear();
                }
            }
            return didProvision;
        }

        private Collection<String> getWorkerNodeIDs(Collection<Worker> workers, DefaultWorkerBehaviorConfig workerConfig) {
            ArrayList<String> ips = new ArrayList<String>(workers.size());
            for (Worker worker : workers) {
                ips.add(worker.getIp());
            }
            List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(ips);
            log.info("WorkerNodeIds: %d %s", new Object[]{workerNodeIds.size(), workerNodeIds});
            return workerNodeIds;
        }

        private int getScaleUpNodeCount(WorkerTaskRunnerConfig remoteTaskRunnerConfig, DefaultWorkerBehaviorConfig workerConfig, Collection<Task> pendingTasks, Collection<ImmutableWorkerInfo> workers) {
            int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
            int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
            log.info("Min/max workers: %d/%d", new Object[]{minWorkerCount, maxWorkerCount});
            int currValidWorkers = PendingTaskBasedWorkerProvisioningStrategy.this.getCurrValidWorkers(workers);
            int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : this.getWorkersNeededToAssignTasks(remoteTaskRunnerConfig, workerConfig, pendingTasks, workers);
            log.debug("More workers needed: %d", new Object[]{moreWorkersNeeded});
            int want = Math.max(minWorkerCount - currValidWorkers, Math.min(PendingTaskBasedWorkerProvisioningStrategy.this.config.getMaxScalingStep(), moreWorkersNeeded));
            log.info("Want workers: %d", new Object[]{want});
            if (want > 0 && currValidWorkers >= maxWorkerCount) {
                log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", new Object[]{currValidWorkers, maxWorkerCount});
                return 0;
            }
            want = Math.min(want, maxWorkerCount - currValidWorkers);
            return want;
        }

        private int getWorkersNeededToAssignTasks(WorkerTaskRunnerConfig workerTaskRunnerConfig, DefaultWorkerBehaviorConfig workerConfig, Collection<Task> pendingTasks, Collection<ImmutableWorkerInfo> workers) {
            Collection validWorkers = Collections2.filter(workers, ProvisioningUtil.createValidWorkerPredicate(PendingTaskBasedWorkerProvisioningStrategy.this.config));
            log.debug("Valid workers: %d %s", new Object[]{validWorkers.size(), validWorkers});
            HashMap<String, ImmutableWorkerInfo> workersMap = new HashMap<String, ImmutableWorkerInfo>();
            for (ImmutableWorkerInfo worker : validWorkers) {
                workersMap.put(worker.getWorker().getHost(), worker);
            }
            WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
            int need = 0;
            int capacity = PendingTaskBasedWorkerProvisioningStrategy.getExpectedWorkerCapacity(workers);
            log.info("Expected worker capacity: %d", new Object[]{capacity});
            for (Task task : pendingTasks) {
                ImmutableWorkerInfo workerRunningTask;
                ImmutableWorkerInfo selectedWorker = workerSelectStrategy.findWorkerForTask(workerTaskRunnerConfig, (ImmutableMap<String, ImmutableWorkerInfo>)ImmutableMap.copyOf(workersMap), task);
                if (selectedWorker != null) {
                    workerRunningTask = selectedWorker;
                    log.debug("Worker[%s] able to take the task[%s]", new Object[]{task, workerRunningTask});
                } else {
                    workerRunningTask = PendingTaskBasedWorkerProvisioningStrategy.createDummyWorker(PendingTaskBasedWorkerProvisioningStrategy.SCHEME, "dummy" + need, capacity, workerTaskRunnerConfig.getMinWorkerVersion());
                    log.debug("Need more workers, creating a dummy worker[%s]", new Object[]{workerRunningTask});
                    ++need;
                }
                workersMap.put(workerRunningTask.getWorker().getHost(), PendingTaskBasedWorkerProvisioningStrategy.workerWithTask(workerRunningTask, task));
            }
            return need;
        }

        @Override
        public synchronized boolean doTerminate() {
            Collection<ImmutableWorkerInfo> zkWorkers = this.runner.getWorkers();
            log.debug("Workers: %d [%s]", new Object[]{zkWorkers.size(), zkWorkers});
            DefaultWorkerBehaviorConfig workerConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig((Supplier<WorkerBehaviorConfig>)PendingTaskBasedWorkerProvisioningStrategy.this.workerConfigRef, "terminate", log);
            if (workerConfig == null) {
                return false;
            }
            log.info("Currently provisioning: %d %s", new Object[]{this.currentlyProvisioning.size(), this.currentlyProvisioning});
            if (!this.currentlyProvisioning.isEmpty()) {
                log.debug("Already provisioning nodes, Not Terminating any nodes.", new Object[0]);
                return false;
            }
            boolean didTerminate = false;
            Collection<String> workerNodeIds = this.getWorkerNodeIDs(this.runner.getLazyWorkers(), workerConfig);
            log.debug("Currently terminating: %d %s", new Object[]{this.currentlyTerminating.size(), this.currentlyTerminating});
            this.currentlyTerminating.retainAll(workerNodeIds);
            log.debug("Currently terminating among WorkerNodeIds: %d %s", new Object[]{this.currentlyTerminating.size(), this.currentlyTerminating});
            if (this.currentlyTerminating.isEmpty()) {
                int maxWorkersToTerminate = PendingTaskBasedWorkerProvisioningStrategy.this.maxWorkersToTerminate(zkWorkers, workerConfig);
                log.info("Max workers to terminate: %d", new Object[]{maxWorkersToTerminate});
                Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(PendingTaskBasedWorkerProvisioningStrategy.this.config);
                Collection laziestWorkerIps = Collections2.transform(this.runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate), (Function)new Function<Worker, String>(){

                    public String apply(Worker zkWorker) {
                        return zkWorker.getIp();
                    }
                });
                log.info("Laziest worker ips: %d %s", new Object[]{laziestWorkerIps.size(), laziestWorkerIps});
                if (laziestWorkerIps.isEmpty()) {
                    log.debug("Found no lazy workers", new Object[0]);
                } else {
                    log.info("Terminating %,d lazy workers: %s", new Object[]{laziestWorkerIps.size(), Joiner.on((String)", ").join((Iterable)laziestWorkerIps)});
                    AutoScalingData terminated = workerConfig.getAutoScaler().terminate((List<String>)ImmutableList.copyOf((Collection)laziestWorkerIps));
                    if (terminated != null) {
                        log.info("Terminated: %d %s", new Object[]{terminated.getNodeIds().size(), terminated.getNodeIds()});
                        this.currentlyTerminating.addAll(terminated.getNodeIds());
                        this.lastTerminateTime = DateTimes.nowUtc();
                        this.scalingStats.addTerminateEvent(terminated);
                        didTerminate = true;
                    }
                }
            } else {
                Duration durSinceLastTerminate = new Duration((ReadableInstant)this.lastTerminateTime, (ReadableInstant)DateTimes.nowUtc());
                log.info("%s terminating. Current wait time: %s", new Object[]{this.currentlyTerminating, durSinceLastTerminate});
                if (durSinceLastTerminate.isLongerThan((ReadableDuration)PendingTaskBasedWorkerProvisioningStrategy.this.config.getMaxScalingDuration().toStandardDuration())) {
                    log.makeAlert("Worker node termination taking too long!", new Object[0]).addData("millisSinceLastTerminate", (Object)durSinceLastTerminate.getMillis()).addData("terminatingCount", (Object)this.currentlyTerminating.size()).emit();
                    this.currentlyTerminating.clear();
                }
            }
            return didTerminate;
        }

        @Override
        public ScalingStats getStats() {
            return this.scalingStats;
        }
    }
}

