/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkQueue;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.ZkWorker;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.joda.time.ReadablePeriod;

public class RemoteTaskRunner
implements WorkerTaskRunner,
TaskLogStreamer {
    private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
    private static final Joiner JOINER = Joiner.on((String)"/");
    private final ObjectMapper jsonMapper;
    private final RemoteTaskRunnerConfig config;
    private final Duration shutdownTimeout;
    private final IndexerZkConfig indexerZkConfig;
    private final CuratorFramework cf;
    private final PathChildrenCacheFactory workerStatusPathChildrenCacheFactory;
    private final ExecutorService workerStatusPathChildrenCacheExecutor;
    private final PathChildrenCache workerPathCache;
    private final HttpClient httpClient;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>();
    private final ConcurrentMap<String, Task> pendingTaskPayloads = new ConcurrentHashMap<String, Task>();
    private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
    private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
    private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue();
    private final ExecutorService runPendingTasksExec;
    private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap<String, ZkWorker>();
    private final Set<ZkWorker> blackListedWorkers = Collections.synchronizedSet(new HashSet());
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList();
    private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap<String, String>();
    private final ConcurrentMap<String, String> tryAssignTasks = new ConcurrentHashMap<String, String>();
    private final Object statusLock = new Object();
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ListeningScheduledExecutorService cleanupExec;
    private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<String, ScheduledFuture>();
    private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
    private ProvisioningService provisioningService;

    public RemoteTaskRunner(ObjectMapper jsonMapper, RemoteTaskRunnerConfig config, IndexerZkConfig indexerZkConfig, CuratorFramework cf, PathChildrenCacheFactory.Builder pathChildrenCacheFactory, HttpClient httpClient, Supplier<WorkerBehaviorConfig> workerConfigRef, ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy) {
        this.jsonMapper = jsonMapper;
        this.config = config;
        this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration();
        this.indexerZkConfig = indexerZkConfig;
        this.cf = cf;
        this.workerPathCache = pathChildrenCacheFactory.build().make(cf, indexerZkConfig.getAnnouncementsPath());
        this.workerStatusPathChildrenCacheExecutor = PathChildrenCacheFactory.Builder.createDefaultExecutor();
        this.workerStatusPathChildrenCacheFactory = pathChildrenCacheFactory.withExecutorService(this.workerStatusPathChildrenCacheExecutor).withShutdownExecutorOnClose(false).build();
        this.httpClient = httpClient;
        this.workerConfigRef = workerConfigRef;
        this.cleanupExec = MoreExecutors.listeningDecorator((ScheduledExecutorService)ScheduledExecutors.fixed((int)1, (String)"RemoteTaskRunner-Scheduled-Cleanup--%d"));
        this.provisioningStrategy = provisioningStrategy;
        this.runPendingTasksExec = Execs.multiThreaded((int)config.getPendingTasksRunnerNumThreads(), (String)"rtr-pending-tasks-runner-%d");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStart
    public void start() {
        if (!this.lifecycleLock.canStart()) {
            return;
        }
        try {
            final MutableInt waitingFor = new MutableInt(1);
            final Object waitingForMonitor = new Object();
            this.workerPathCache.getListenable().addListener((client, event) -> {
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        Worker worker = (Worker)this.jsonMapper.readValue(event.getData().getData(), Worker.class);
                        Object object = waitingForMonitor;
                        synchronized (object) {
                            waitingFor.increment();
                        }
                        Futures.addCallback(this.addWorker(worker), (FutureCallback)new FutureCallback<ZkWorker>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public void onSuccess(ZkWorker zkWorker) {
                                Object object = waitingForMonitor;
                                synchronized (object) {
                                    waitingFor.decrement();
                                    waitingForMonitor.notifyAll();
                                }
                            }

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public void onFailure(Throwable throwable) {
                                Object object = waitingForMonitor;
                                synchronized (object) {
                                    waitingFor.decrement();
                                    waitingForMonitor.notifyAll();
                                }
                            }
                        });
                        break;
                    }
                    case CHILD_UPDATED: {
                        Worker worker = (Worker)this.jsonMapper.readValue(event.getData().getData(), Worker.class);
                        this.updateWorker(worker);
                        break;
                    }
                    case CHILD_REMOVED: {
                        Worker worker = (Worker)this.jsonMapper.readValue(event.getData().getData(), Worker.class);
                        this.removeWorker(worker);
                        break;
                    }
                    case INITIALIZED: {
                        Object workers;
                        try {
                            workers = (List)this.cf.getChildren().forPath(this.indexerZkConfig.getStatusPath());
                        }
                        catch (KeeperException.NoNodeException e) {
                            workers = ImmutableList.of();
                        }
                        for (String workerId : workers) {
                            String workerAnnouncePath = JOINER.join((Object)this.indexerZkConfig.getAnnouncementsPath(), (Object)workerId, new Object[0]);
                            String workerStatusPath = JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)workerId, new Object[0]);
                            if (this.zkWorkers.containsKey(workerId) || this.cf.checkExists().forPath(workerAnnouncePath) != null) continue;
                            try {
                                this.scheduleTasksCleanupForWorker(workerId, (List)this.cf.getChildren().forPath(workerStatusPath));
                            }
                            catch (Exception e) {
                                log.warn((Throwable)e, "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.", new Object[]{workerId, workerStatusPath});
                            }
                        }
                        Object object = waitingForMonitor;
                        synchronized (object) {
                            waitingFor.decrement();
                            waitingForMonitor.notifyAll();
                            break;
                        }
                    }
                }
            });
            this.workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            Object object = waitingForMonitor;
            synchronized (object) {
                while (waitingFor.intValue() > 0) {
                    waitingForMonitor.wait();
                }
            }
            ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.cleanupExec, (Duration)Period.ZERO.toStandardDuration(), (Duration)this.config.getWorkerBlackListCleanupPeriod().toStandardDuration(), this::checkBlackListedNodes);
            this.provisioningService = this.provisioningStrategy.makeProvisioningService(this);
            this.lifecycleLock.started();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            this.lifecycleLock.exitStart();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStop
    public void stop() {
        if (!this.lifecycleLock.canStop()) {
            return;
        }
        try {
            this.provisioningService.close();
            Closer closer = Closer.create();
            for (ZkWorker zkWorker : this.zkWorkers.values()) {
                closer.register((Closeable)zkWorker);
            }
            closer.register((Closeable)this.workerPathCache);
            try {
                closer.close();
            }
            finally {
                this.workerStatusPathChildrenCacheExecutor.shutdown();
            }
            if (this.runPendingTasksExec != null) {
                this.runPendingTasksExec.shutdown();
            }
            if (this.cleanupExec != null) {
                this.cleanupExec.shutdown();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            this.lifecycleLock.exitStop();
        }
    }

    @Override
    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        return ImmutableList.of();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerListener(TaskRunnerListener listener, Executor executor) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listener.getListenerId())) continue;
            throw new ISE("Listener [%s] already registered", new Object[]{listener.getListenerId()});
        }
        Pair listenerPair = Pair.of((Object)listener, (Object)executor);
        Object object = this.statusLock;
        synchronized (object) {
            for (Map.Entry entry : this.runningTasks.entrySet()) {
                TaskRunnerUtils.notifyLocationChanged((Iterable<Pair<TaskRunnerListener, Executor>>)ImmutableList.of((Object)listenerPair), (String)entry.getKey(), ((RemoteTaskRunnerWorkItem)entry.getValue()).getLocation());
            }
            log.info("Registered listener [%s]", new Object[]{listener.getListenerId()});
            this.listeners.add((Pair<TaskRunnerListener, Executor>)listenerPair);
        }
    }

    @Override
    public void unregisterListener(String listenerId) {
        for (Pair<TaskRunnerListener, Executor> pair : this.listeners) {
            if (!((TaskRunnerListener)pair.lhs).getListenerId().equals(listenerId)) continue;
            this.listeners.remove(pair);
            log.info("Unregistered listener [%s]", new Object[]{listenerId});
            return;
        }
    }

    @Override
    public Collection<ImmutableWorkerInfo> getWorkers() {
        return RemoteTaskRunner.getImmutableWorkerFromZK(this.zkWorkers.values());
    }

    public Collection<RemoteTaskRunnerWorkItem> getRunningTasks() {
        return ImmutableList.copyOf(this.runningTasks.values());
    }

    public Collection<RemoteTaskRunnerWorkItem> getPendingTasks() {
        return ImmutableList.copyOf(this.pendingTasks.values());
    }

    @Override
    public Collection<Task> getPendingTaskPayloads() {
        return ImmutableList.copyOf(this.pendingTaskPayloads.values());
    }

    @Override
    public RemoteTaskRunnerConfig getConfig() {
        return this.config;
    }

    public Collection<RemoteTaskRunnerWorkItem> getKnownTasks() {
        return ImmutableList.copyOf((Iterable)Iterables.concat(this.pendingTasks.values(), this.runningTasks.values(), this.completeTasks.values()));
    }

    @Override
    @Nullable
    public RunnerTaskState getRunnerTaskState(String taskId) {
        if (this.pendingTasks.containsKey(taskId)) {
            return RunnerTaskState.PENDING;
        }
        if (this.runningTasks.containsKey(taskId)) {
            return RunnerTaskState.RUNNING;
        }
        if (this.completeTasks.containsKey(taskId)) {
            return RunnerTaskState.NONE;
        }
        return null;
    }

    @Override
    public TaskLocation getTaskLocation(String taskId) {
        if (this.pendingTasks.containsKey(taskId)) {
            return ((RemoteTaskRunnerWorkItem)this.pendingTasks.get(taskId)).getLocation();
        }
        if (this.runningTasks.containsKey(taskId)) {
            return ((RemoteTaskRunnerWorkItem)this.runningTasks.get(taskId)).getLocation();
        }
        if (this.completeTasks.containsKey(taskId)) {
            return ((RemoteTaskRunnerWorkItem)this.completeTasks.get(taskId)).getLocation();
        }
        return TaskLocation.unknown();
    }

    @Override
    public Optional<ScalingStats> getScalingStats() {
        return Optional.fromNullable((Object)this.provisioningService.getStats());
    }

    public ZkWorker findWorkerRunningTask(String taskId) {
        for (ZkWorker zkWorker : this.zkWorkers.values()) {
            if (!zkWorker.isRunningTask(taskId)) continue;
            return zkWorker;
        }
        return null;
    }

    public boolean isWorkerRunningTask(ZkWorker worker, String taskId) {
        return ((ZkWorker)Preconditions.checkNotNull((Object)worker, (Object)"worker")).isRunningTask(taskId);
    }

    @Override
    public ListenableFuture<TaskStatus> run(Task task) {
        RemoteTaskRunnerWorkItem pendingTask = (RemoteTaskRunnerWorkItem)this.pendingTasks.get(task.getId());
        if (pendingTask != null) {
            log.info("Assigned a task[%s] that is already pending!", new Object[]{task.getId()});
            this.runPendingTasks();
            return pendingTask.getResult();
        }
        RemoteTaskRunnerWorkItem runningTask = (RemoteTaskRunnerWorkItem)this.runningTasks.get(task.getId());
        if (runningTask != null) {
            ZkWorker zkWorker = this.findWorkerRunningTask(task.getId());
            if (zkWorker == null) {
                log.warn("Told to run task[%s], but no worker has started running it yet.", new Object[]{task.getId()});
            } else {
                log.info("Task[%s] already running on %s.", new Object[]{task.getId(), zkWorker.getWorker().getHost()});
                TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
                if (announcement.getTaskStatus().isComplete()) {
                    this.taskComplete(runningTask, zkWorker, announcement.getTaskStatus());
                }
            }
            return runningTask.getResult();
        }
        RemoteTaskRunnerWorkItem completeTask = (RemoteTaskRunnerWorkItem)this.completeTasks.get(task.getId());
        if (completeTask != null) {
            return completeTask.getResult();
        }
        return this.addPendingTask(task).getResult();
    }

    @Override
    public void shutdown(String taskId, String reason) {
        log.info("Shutdown [%s] because: [%s]", new Object[]{taskId, reason});
        if (!this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS)) {
            log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", new Object[]{taskId});
        } else if (this.pendingTasks.remove(taskId) != null) {
            this.pendingTaskPayloads.remove(taskId);
            log.info("Removed task from pending queue: %s", new Object[]{taskId});
        } else if (this.completeTasks.containsKey(taskId)) {
            this.cleanup(taskId);
        } else {
            ZkWorker zkWorker = this.findWorkerRunningTask(taskId);
            if (zkWorker == null) {
                log.info("Can't shutdown! No worker running task %s", new Object[]{taskId});
                return;
            }
            URL url = null;
            try {
                url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), "/druid/worker/v1/task/%s/shutdown", taskId);
                StatusResponseHolder response = (StatusResponseHolder)this.httpClient.go(new Request(HttpMethod.POST, url), (HttpResponseHandler)StatusResponseHandler.getInstance(), this.shutdownTimeout).get();
                log.info("Sent shutdown message to worker: %s, status %s, response: %s", new Object[]{zkWorker.getWorker().getHost(), response.getStatus(), response.getContent()});
                if (!HttpResponseStatus.OK.equals((Object)response.getStatus())) {
                    log.error("Shutdown failed for %s! Are you sure the task was running?", new Object[]{taskId});
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RE((Throwable)e, "Interrupted posting shutdown to [%s] for task [%s]", new Object[]{url, taskId});
            }
            catch (Exception e) {
                throw new RE((Throwable)e, "Error in handling post to [%s] for task [%s]", new Object[]{zkWorker.getWorker().getHost(), taskId});
            }
        }
    }

    public Optional<ByteSource> streamTaskLog(String taskId, long offset) {
        ZkWorker zkWorker = this.findWorkerRunningTask(taskId);
        if (zkWorker == null) {
            return Optional.absent();
        }
        final URL url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), "/druid/worker/v1/task/%s/log?offset=%s", taskId, Long.toString(offset));
        return Optional.of((Object)new ByteSource(){

            public InputStream openStream() throws IOException {
                try {
                    return (InputStream)RemoteTaskRunner.this.httpClient.go(new Request(HttpMethod.GET, url), (HttpResponseHandler)new InputStreamResponseHandler()).get();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    Throwables.propagateIfPossible((Throwable)e.getCause(), IOException.class);
                    throw new RuntimeException(e);
                }
            }
        });
    }

    public Optional<ByteSource> streamTaskReports(String taskId) {
        ZkWorker zkWorker = this.findWorkerRunningTask(taskId);
        if (zkWorker == null) {
            return Optional.absent();
        }
        TaskLocation taskLocation = ((RemoteTaskRunnerWorkItem)this.runningTasks.get(taskId)).getLocation();
        final URL url = TaskRunnerUtils.makeTaskLocationURL(taskLocation, "/druid/worker/v1/chat/%s/liveReports", taskId);
        return Optional.of((Object)new ByteSource(){

            public InputStream openStream() throws IOException {
                try {
                    return (InputStream)RemoteTaskRunner.this.httpClient.go(new Request(HttpMethod.GET, url), (HttpResponseHandler)new InputStreamResponseHandler()).get();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    Throwables.propagateIfPossible((Throwable)e.getCause(), IOException.class);
                    throw new RuntimeException(e);
                }
            }
        });
    }

    @VisibleForTesting
    RemoteTaskRunnerWorkItem addPendingTask(Task task) {
        log.info("Added pending task %s", new Object[]{task.getId()});
        RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), task.getType(), null, null, task.getDataSource());
        this.pendingTaskPayloads.put(task.getId(), task);
        this.pendingTasks.put(task.getId(), taskRunnerWorkItem);
        this.runPendingTasks();
        return taskRunnerWorkItem;
    }

    private void runPendingTasks() {
        this.runPendingTasksExec.submit(() -> {
            try {
                ArrayList copy = Lists.newArrayList(this.pendingTasks.values());
                RemoteTaskRunner.sortByInsertionTime(copy);
                for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
                    String taskId = taskRunnerWorkItem.getTaskId();
                    if (this.tryAssignTasks.putIfAbsent(taskId, taskId) != null) continue;
                    try {
                        Task task = (Task)this.pendingTaskPayloads.get(taskId);
                        if (task == null || !this.tryAssignTask(task, taskRunnerWorkItem)) continue;
                        this.pendingTaskPayloads.remove(taskId);
                    }
                    catch (Exception e) {
                        log.makeAlert((Throwable)e, "Exception while trying to assign task", new Object[0]).addData("taskId", (Object)taskRunnerWorkItem.getTaskId()).emit();
                        RemoteTaskRunnerWorkItem workItem = (RemoteTaskRunnerWorkItem)this.pendingTasks.remove(taskId);
                        if (workItem == null) continue;
                        this.taskComplete(workItem, null, TaskStatus.failure((String)taskId));
                    }
                    finally {
                        this.tryAssignTasks.remove(taskId);
                    }
                }
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception in running pending tasks", new Object[0]).emit();
            }
            return null;
        });
    }

    @VisibleForTesting
    static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> tasks) {
        Collections.sort(tasks, Comparator.comparing(TaskRunnerWorkItem::getQueueInsertionTime));
    }

    private void cleanup(String taskId) {
        Worker worker;
        if (!this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS)) {
            return;
        }
        RemoteTaskRunnerWorkItem removed = (RemoteTaskRunnerWorkItem)this.completeTasks.remove(taskId);
        if (removed == null || (worker = removed.getWorker()) == null) {
            log.makeAlert("WTF?! Asked to cleanup nonexistent task", new Object[0]).addData("taskId", (Object)taskId).emit();
        } else {
            String workerId = worker.getHost();
            log.info("Cleaning up task[%s] on worker[%s]", new Object[]{taskId, workerId});
            String statusPath = JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)workerId, new Object[]{taskId});
            try {
                ((ChildrenDeletable)this.cf.delete().guaranteed()).forPath(statusPath);
            }
            catch (KeeperException.NoNodeException e) {
                log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", new Object[]{statusPath});
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean tryAssignTask(Task task, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception {
        WorkerSelectStrategy strategy;
        Preconditions.checkNotNull((Object)task, (Object)"task");
        Preconditions.checkNotNull((Object)taskRunnerWorkItem, (Object)"taskRunnerWorkItem");
        Preconditions.checkArgument((boolean)task.getId().equals(taskRunnerWorkItem.getTaskId()), (Object)"task id != workItem id");
        if (this.runningTasks.containsKey(task.getId()) || this.findWorkerRunningTask(task.getId()) != null) {
            log.info("Task[%s] already running.", new Object[]{task.getId()});
            return true;
        }
        WorkerBehaviorConfig workerConfig = (WorkerBehaviorConfig)this.workerConfigRef.get();
        if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
            strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
            log.debug("No worker selection strategy set. Using default of [%s]", new Object[]{strategy.getClass().getSimpleName()});
        } else {
            strategy = workerConfig.getSelectStrategy();
        }
        ZkWorker assignedWorker = null;
        try {
            ConcurrentMap<String, String> concurrentMap = this.workersWithUnacknowledgedTask;
            synchronized (concurrentMap) {
                ImmutableWorkerInfo immutableZkWorker = strategy.findWorkerForTask(this.config, (ImmutableMap<String, ImmutableWorkerInfo>)ImmutableMap.copyOf((Map)Maps.transformEntries((Map)Maps.filterEntries(this.zkWorkers, input -> !this.lazyWorkers.containsKey(input.getKey()) && !this.workersWithUnacknowledgedTask.containsKey(input.getKey()) && !this.blackListedWorkers.contains(input.getValue())), (key, value) -> value.toImmutable())), task);
                if (immutableZkWorker != null && this.workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId()) == null) {
                    assignedWorker = (ZkWorker)this.zkWorkers.get(immutableZkWorker.getWorker().getHost());
                }
            }
            if (assignedWorker != null) {
                boolean bl = this.announceTask(task, assignedWorker, taskRunnerWorkItem);
                return bl;
            }
            log.debug("Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].", new Object[]{task.getId(), this.zkWorkers.values(), this.workersWithUnacknowledgedTask});
            boolean bl = false;
            return bl;
        }
        finally {
            if (assignedWorker != null) {
                this.workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
                this.runPendingTasks();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean announceTask(Task task, ZkWorker theZkWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception {
        Preconditions.checkArgument((boolean)task.getId().equals(taskRunnerWorkItem.getTaskId()), (Object)"task id != workItem id");
        String worker = theZkWorker.getWorker().getHost();
        Object object = this.statusLock;
        synchronized (object) {
            if (!this.zkWorkers.containsKey(worker) || this.lazyWorkers.containsKey(worker)) {
                log.info("Not assigning task to already removed worker[%s]", new Object[]{worker});
                return false;
            }
            log.info("Coordinator asking Worker[%s] to add task[%s]", new Object[]{worker, task.getId()});
            CuratorUtils.createIfNotExists((CuratorFramework)this.cf, (String)JOINER.join((Object)this.indexerZkConfig.getTasksPath(), (Object)worker, new Object[]{task.getId()}), (CreateMode)CreateMode.EPHEMERAL, (byte[])this.jsonMapper.writeValueAsBytes((Object)task), (int)this.config.getMaxZnodeBytes());
            RemoteTaskRunnerWorkItem workItem = (RemoteTaskRunnerWorkItem)this.pendingTasks.remove(task.getId());
            if (workItem == null) {
                log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!", new Object[0]).addData("taskId", (Object)task.getId()).emit();
                return false;
            }
            RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
            this.runningTasks.put(task.getId(), newWorkItem);
            log.info("Task %s switched from pending to running (on [%s])", new Object[]{task.getId(), newWorkItem.getWorker().getHost()});
            TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), TaskStatus.running((String)task.getId()));
            Stopwatch timeoutStopwatch = Stopwatch.createStarted();
            while (!this.isWorkerRunningTask(theZkWorker, task.getId())) {
                long waitMs = this.config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
                this.statusLock.wait(waitMs);
                long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed < waitMs) continue;
                log.makeAlert("Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!", new Object[]{worker, task.getId(), elapsed, this.config.getTaskAssignmentTimeout()}).emit();
                this.taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure((String)task.getId()));
                break;
            }
            return true;
        }
    }

    private boolean cancelWorkerCleanup(String workerHost) {
        ScheduledFuture previousCleanup = (ScheduledFuture)this.removedWorkerCleanups.remove(workerHost);
        if (previousCleanup != null) {
            log.info("Cancelling Worker[%s] scheduled task cleanup", new Object[]{workerHost});
            previousCleanup.cancel(false);
        }
        return previousCleanup != null;
    }

    private ListenableFuture<ZkWorker> addWorker(Worker worker) {
        log.info("Worker[%s] reportin' for duty!", new Object[]{worker.getHost()});
        try {
            this.cancelWorkerCleanup(worker.getHost());
            String workerStatusPath = JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)worker.getHost(), new Object[0]);
            PathChildrenCache statusCache = this.workerStatusPathChildrenCacheFactory.make(this.cf, workerStatusPath);
            SettableFuture retVal = SettableFuture.create();
            ZkWorker zkWorker = new ZkWorker(worker, statusCache, this.jsonMapper);
            zkWorker.addListener((client, event) -> {
                Object object = this.statusLock;
                synchronized (object) {
                    try {
                        switch (event.getType()) {
                            case CHILD_ADDED: 
                            case CHILD_UPDATED: {
                                RemoteTaskRunnerWorkItem taskRunnerWorkItem;
                                String taskId = ZKPaths.getNodeFromPath((String)event.getData().getPath());
                                TaskAnnouncement announcement = (TaskAnnouncement)this.jsonMapper.readValue(event.getData().getData(), TaskAnnouncement.class);
                                log.info("Worker[%s] wrote %s status for task [%s] on [%s]", new Object[]{zkWorker.getWorker().getHost(), announcement.getTaskStatus().getStatusCode(), taskId, announcement.getTaskLocation()});
                                this.statusLock.notifyAll();
                                RemoteTaskRunnerWorkItem tmp = (RemoteTaskRunnerWorkItem)this.runningTasks.get(taskId);
                                if (tmp != null) {
                                    taskRunnerWorkItem = tmp;
                                } else {
                                    RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(taskId, announcement.getTaskType(), zkWorker.getWorker(), TaskLocation.unknown(), announcement.getTaskDataSource());
                                    RemoteTaskRunnerWorkItem existingItem = this.runningTasks.putIfAbsent(taskId, newTaskRunnerWorkItem);
                                    if (existingItem == null) {
                                        log.warn("Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", new Object[]{zkWorker.getWorker().getHost(), taskId});
                                        taskRunnerWorkItem = newTaskRunnerWorkItem;
                                    } else {
                                        taskRunnerWorkItem = existingItem;
                                    }
                                }
                                if (!announcement.getTaskLocation().equals((Object)taskRunnerWorkItem.getLocation())) {
                                    taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
                                    TaskRunnerUtils.notifyLocationChanged(this.listeners, taskId, announcement.getTaskLocation());
                                }
                                if (!announcement.getTaskStatus().isComplete()) break;
                                this.taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
                                this.runPendingTasks();
                                break;
                            }
                            case CHILD_REMOVED: {
                                String taskId = ZKPaths.getNodeFromPath((String)event.getData().getPath());
                                RemoteTaskRunnerWorkItem taskRunnerWorkItem = (RemoteTaskRunnerWorkItem)this.runningTasks.remove(taskId);
                                if (taskRunnerWorkItem != null) {
                                    log.info("Task[%s] just disappeared!", new Object[]{taskId});
                                    taskRunnerWorkItem.setResult(TaskStatus.failure((String)taskId));
                                    TaskRunnerUtils.notifyStatusChanged(this.listeners, taskId, TaskStatus.failure((String)taskId));
                                    break;
                                }
                                log.info("Task[%s] went bye bye.", new Object[]{taskId});
                                break;
                            }
                            case INITIALIZED: {
                                if (this.zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
                                    retVal.set((Object)zkWorker);
                                } else {
                                    String message = StringUtils.format((String)"WTF?! Tried to add already-existing worker[%s]", (Object[])new Object[]{worker.getHost()});
                                    log.makeAlert(message, new Object[0]).addData("workerHost", (Object)worker.getHost()).addData("workerIp", (Object)worker.getIp()).emit();
                                    retVal.setException((Throwable)new IllegalStateException(message));
                                }
                                this.runPendingTasks();
                                break;
                            }
                        }
                    }
                    catch (Exception e) {
                        log.makeAlert((Throwable)e, "Failed to handle new worker status", new Object[0]).addData("worker", (Object)zkWorker.getWorker().getHost()).addData("znode", (Object)event.getData().getPath()).emit();
                    }
                }
            });
            zkWorker.start();
            return retVal;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void updateWorker(Worker worker) {
        ZkWorker zkWorker = (ZkWorker)this.zkWorkers.get(worker.getHost());
        if (zkWorker != null) {
            log.info("Worker[%s] updated its announcement from[%s] to[%s].", new Object[]{worker.getHost(), zkWorker.getWorker(), worker});
            zkWorker.setWorker(worker);
        } else {
            log.warn("WTF, worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.", new Object[]{worker.getHost()});
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void removeWorker(Worker worker) {
        log.info("Kaboom! Worker[%s] removed!", new Object[]{worker.getHost()});
        ZkWorker zkWorker = (ZkWorker)this.zkWorkers.get(worker.getHost());
        if (zkWorker != null) {
            try {
                this.scheduleTasksCleanupForWorker(worker.getHost(), this.getAssignedTasks(worker));
            }
            catch (Exception e) {
                try {
                    throw new RuntimeException(e);
                }
                catch (Throwable throwable) {
                    try {
                        zkWorker.close();
                    }
                    catch (Exception e2) {
                        log.error((Throwable)e2, "Exception closing worker[%s]!", new Object[]{worker.getHost()});
                    }
                    this.zkWorkers.remove(worker.getHost());
                    this.checkBlackListedNodes();
                    throw throwable;
                }
            }
            try {
                zkWorker.close();
            }
            catch (Exception e) {
                log.error((Throwable)e, "Exception closing worker[%s]!", new Object[]{worker.getHost()});
            }
            this.zkWorkers.remove(worker.getHost());
            this.checkBlackListedNodes();
        }
        this.lazyWorkers.remove(worker.getHost());
    }

    private void scheduleTasksCleanupForWorker(final String worker, List<String> tasksToFail) {
        this.cancelWorkerCleanup(worker);
        final ListenableScheduledFuture cleanupTask = this.cleanupExec.schedule(() -> {
            log.info("Running scheduled cleanup for Worker[%s]", new Object[]{worker});
            try {
                for (String assignedTask : tasksToFail) {
                    String taskPath = JOINER.join((Object)this.indexerZkConfig.getTasksPath(), (Object)worker, new Object[]{assignedTask});
                    String statusPath = JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)worker, new Object[]{assignedTask});
                    if (this.cf.checkExists().forPath(taskPath) != null) {
                        ((ChildrenDeletable)this.cf.delete().guaranteed()).forPath(taskPath);
                    }
                    if (this.cf.checkExists().forPath(statusPath) != null) {
                        ((ChildrenDeletable)this.cf.delete().guaranteed()).forPath(statusPath);
                    }
                    log.info("Failing task[%s]", new Object[]{assignedTask});
                    RemoteTaskRunnerWorkItem taskRunnerWorkItem = (RemoteTaskRunnerWorkItem)this.runningTasks.remove(assignedTask);
                    if (taskRunnerWorkItem != null) {
                        taskRunnerWorkItem.setResult(TaskStatus.failure((String)assignedTask));
                        TaskRunnerUtils.notifyStatusChanged(this.listeners, assignedTask, TaskStatus.failure((String)assignedTask));
                        continue;
                    }
                    log.warn("RemoteTaskRunner has no knowledge of task[%s]", new Object[]{assignedTask});
                }
                String workerStatusPath = JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)worker, new Object[0]);
                if (this.cf.checkExists().forPath(workerStatusPath) != null) {
                    ((ChildrenDeletable)this.cf.delete().guaranteed()).forPath(JOINER.join((Object)this.indexerZkConfig.getStatusPath(), (Object)worker, new Object[0]));
                }
            }
            catch (Exception e) {
                log.makeAlert("Exception while cleaning up worker[%s]", new Object[]{worker}).emit();
                throw new RuntimeException(e);
            }
        }, this.config.getTaskCleanupTimeout().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS);
        this.removedWorkerCleanups.put(worker, (ScheduledFuture)cleanupTask);
        Futures.addCallback((ListenableFuture)cleanupTask, (FutureCallback)new FutureCallback<Object>(){

            public void onSuccess(Object result) {
                RemoteTaskRunner.this.removedWorkerCleanups.remove(worker, cleanupTask);
            }

            public void onFailure(Throwable t) {
                RemoteTaskRunner.this.removedWorkerCleanups.remove(worker, cleanupTask);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void taskComplete(RemoteTaskRunnerWorkItem taskRunnerWorkItem, ZkWorker zkWorker, TaskStatus taskStatus) {
        Preconditions.checkNotNull((Object)taskRunnerWorkItem, (Object)"taskRunnerWorkItem");
        Preconditions.checkNotNull((Object)taskStatus, (Object)"taskStatus");
        if (zkWorker != null) {
            log.info("Worker[%s] completed task[%s] with status[%s]", new Object[]{zkWorker.getWorker().getHost(), taskStatus.getId(), taskStatus.getStatusCode()});
            zkWorker.setLastCompletedTaskTime(DateTimes.nowUtc());
        } else {
            log.info("Workerless task[%s] completed with status[%s]", new Object[]{taskStatus.getId(), taskStatus.getStatusCode()});
        }
        this.completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
        this.runningTasks.remove(taskStatus.getId());
        if (zkWorker != null) {
            if (taskStatus.isSuccess()) {
                zkWorker.resetContinuouslyFailedTasksCount();
                if (this.blackListedWorkers.remove(zkWorker)) {
                    zkWorker.setBlacklistedUntil(null);
                    log.info("[%s] removed from blacklist because a task finished with SUCCESS", new Object[]{zkWorker.getWorker()});
                }
            } else if (taskStatus.isFailure()) {
                zkWorker.incrementContinuouslyFailedTasksCount();
            }
            Set<ZkWorker> set = this.blackListedWorkers;
            synchronized (set) {
                if (zkWorker.getContinuouslyFailedTasksCount() > this.config.getMaxRetriesBeforeBlacklist() && (double)this.blackListedWorkers.size() <= (double)this.zkWorkers.size() * ((double)this.config.getMaxPercentageBlacklistWorkers() / 100.0) - 1.0) {
                    zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus((ReadablePeriod)this.config.getWorkerBlackListBackoffTime()));
                    if (this.blackListedWorkers.add(zkWorker)) {
                        log.info("Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", new Object[]{zkWorker.getWorker(), zkWorker.getBlacklistedUntil(), zkWorker.getContinuouslyFailedTasksCount()});
                    }
                }
            }
        }
        taskRunnerWorkItem.setResult(taskStatus);
        TaskRunnerUtils.notifyStatusChanged(this.listeners, taskStatus.getId(), taskStatus);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers) {
        if (maxWorkers < 1) {
            return Collections.emptyList();
        }
        Object object = this.statusLock;
        synchronized (object) {
            for (Map.Entry worker : this.zkWorkers.entrySet()) {
                ZkWorker zkWorker = (ZkWorker)worker.getValue();
                try {
                    if (!this.getAssignedTasks(zkWorker.getWorker()).isEmpty() || !isLazyWorker.apply((Object)zkWorker.toImmutable())) continue;
                    log.info("Adding Worker[%s] to lazySet!", new Object[]{zkWorker.getWorker().getHost()});
                    this.lazyWorkers.put((String)worker.getKey(), zkWorker);
                    if (this.lazyWorkers.size() != maxWorkers) continue;
                    break;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return RemoteTaskRunner.getWorkerFromZK(this.lazyWorkers.values());
        }
    }

    protected List<String> getAssignedTasks(Worker worker) throws Exception {
        ArrayList assignedTasks = Lists.newArrayList((Iterable)((Iterable)this.cf.getChildren().forPath(JOINER.join((Object)this.indexerZkConfig.getTasksPath(), (Object)worker.getHost(), new Object[0]))));
        for (Map.Entry entry : this.runningTasks.entrySet()) {
            if (entry.getValue() == null) {
                log.error("Huh? null work item for [%s]", new Object[]{entry.getKey()});
                continue;
            }
            if (((RemoteTaskRunnerWorkItem)entry.getValue()).getWorker() == null) {
                log.error("Huh? no worker for [%s]", new Object[]{entry.getKey()});
                continue;
            }
            if (!((RemoteTaskRunnerWorkItem)entry.getValue()).getWorker().getHost().equalsIgnoreCase(worker.getHost())) continue;
            log.info("[%s]: Found [%s] running", new Object[]{worker.getHost(), entry.getKey()});
            assignedTasks.add(entry.getKey());
        }
        log.info("[%s]: Found %d tasks assigned", new Object[]{worker.getHost(), assignedTasks.size()});
        return assignedTasks;
    }

    @Override
    public Collection<Worker> getLazyWorkers() {
        return RemoteTaskRunner.getWorkerFromZK(this.lazyWorkers.values());
    }

    private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers) {
        return ImmutableList.copyOf((Collection)Collections2.transform(workers, ZkWorker::toImmutable));
    }

    private static ImmutableList<Worker> getWorkerFromZK(Collection<ZkWorker> workers) {
        return ImmutableList.copyOf((Collection)Collections2.transform(workers, ZkWorker::getWorker));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<ImmutableWorkerInfo> getBlackListedWorkers() {
        Set<ZkWorker> set = this.blackListedWorkers;
        synchronized (set) {
            return RemoteTaskRunner.getImmutableWorkerFromZK(this.blackListedWorkers);
        }
    }

    private boolean shouldRemoveNodeFromBlackList(ZkWorker zkWorker) {
        if ((double)this.blackListedWorkers.size() > (double)this.zkWorkers.size() * ((double)this.config.getMaxPercentageBlacklistWorkers() / 100.0)) {
            log.info("Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]", new Object[]{zkWorker.getWorker(), this.config.getMaxPercentageBlacklistWorkers()});
            return true;
        }
        long remainingMillis = zkWorker.getBlacklistedUntil().getMillis() - this.getCurrentTimeMillis();
        if (remainingMillis <= 0L) {
            log.info("Removing [%s] from blacklist because backoff time elapsed", new Object[]{zkWorker.getWorker()});
            return true;
        }
        log.info("[%s] still blacklisted for [%,ds]", new Object[]{zkWorker.getWorker(), remainingMillis / 1000L});
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void checkBlackListedNodes() {
        boolean shouldRunPendingTasks = false;
        Set<ZkWorker> set = this.blackListedWorkers;
        synchronized (set) {
            Iterator<ZkWorker> iterator = this.blackListedWorkers.iterator();
            while (iterator.hasNext()) {
                ZkWorker zkWorker = iterator.next();
                if (!this.shouldRemoveNodeFromBlackList(zkWorker)) continue;
                iterator.remove();
                zkWorker.resetContinuouslyFailedTasksCount();
                zkWorker.setBlacklistedUntil(null);
                shouldRunPendingTasks = true;
            }
        }
        if (shouldRunPendingTasks) {
            this.runPendingTasks();
        }
    }

    @VisibleForTesting
    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    @VisibleForTesting
    ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups() {
        return this.removedWorkerCleanups;
    }

    @VisibleForTesting
    RemoteTaskRunnerConfig getRemoteTaskRunnerConfig() {
        return this.config;
    }

    @VisibleForTesting
    Map<String, String> getWorkersWithUnacknowledgedTask() {
        return this.workersWithUnacknowledgedTask;
    }
}

