/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.worker;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.WorkerHistoryItem;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.jboss.netty.handler.codec.http.HttpMethod;

public abstract class WorkerTaskManager {
    private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);
    private final ObjectMapper jsonMapper;
    private final TaskRunner taskRunner;
    private final ExecutorService exec;
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ConcurrentMap<String, Task> assignedTasks = new ConcurrentHashMap<String, Task>();
    protected final ConcurrentMap<String, TaskDetails> runningTasks = new ConcurrentHashMap<String, TaskDetails>();
    protected final ConcurrentMap<String, TaskAnnouncement> completedTasks = new ConcurrentHashMap<String, TaskAnnouncement>();
    private final ChangeRequestHistory<WorkerHistoryItem> changeHistory = new ChangeRequestHistory();
    protected final Object lock = new Object();
    private final TaskConfig taskConfig;
    private final ScheduledExecutorService completedTasksCleanupExecutor;
    private final AtomicBoolean disabled = new AtomicBoolean(false);
    private final DruidLeaderClient overlordClient;

    @Inject
    public WorkerTaskManager(ObjectMapper jsonMapper, TaskRunner taskRunner, TaskConfig taskConfig, @IndexingService DruidLeaderClient overlordClient) {
        this.jsonMapper = jsonMapper;
        this.taskRunner = taskRunner;
        this.taskConfig = taskConfig;
        this.exec = Execs.singleThreaded((String)"WorkerTaskManager-NoticeHandler");
        this.completedTasksCleanupExecutor = Execs.scheduledSingleThreaded((String)"WorkerTaskManager-CompletedTasksCleaner");
        this.overlordClient = overlordClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() throws Exception {
        if (!this.lifecycleLock.canStart()) {
            throw new ISE("can't start.", new Object[0]);
        }
        Object object = this.lock;
        synchronized (object) {
            try {
                log.debug("Starting...", new Object[0]);
                this.cleanupAndMakeTmpTaskDir();
                this.registerLocationListener();
                this.restoreRestorableTasks();
                this.initAssignedTasks();
                this.initCompletedTasks();
                this.scheduleCompletedTasksCleanup();
                this.lifecycleLock.started();
                log.debug("Started.", new Object[0]);
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception starting WorkerTaskManager.", new Object[0]).emit();
                throw e;
            }
            finally {
                this.lifecycleLock.exitStart();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() throws Exception {
        if (!this.lifecycleLock.canStop()) {
            throw new ISE("can't stop.", new Object[0]);
        }
        Object object = this.lock;
        synchronized (object) {
            try {
                this.taskRunner.unregisterListener("WorkerTaskManager");
                this.exec.shutdownNow();
                this.taskRunner.stop();
                log.debug("Stopped WorkerTaskManager.", new Object[0]);
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception stopping WorkerTaskManager", new Object[0]).emit();
            }
        }
    }

    public Map<String, TaskAnnouncement> getCompletedTasks() {
        return this.completedTasks;
    }

    private void submitNoticeToExec(Notice notice) {
        this.exec.execute(() -> {
            try {
                notice.handle();
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.makeAlert((Throwable)e, "Failed to handle notice", new Object[0]).addData("noticeClass", (Object)notice.getClass().getSimpleName()).addData("noticeTaskId", (Object)notice.getTaskId()).emit();
            }
        });
    }

    private void restoreRestorableTasks() {
        List<Pair<Task, ListenableFuture<TaskStatus>>> restored = this.taskRunner.restore();
        for (Pair<Task, ListenableFuture<TaskStatus>> pair : restored) {
            this.addRunningTask((Task)pair.lhs, (ListenableFuture<TaskStatus>)((ListenableFuture)pair.rhs));
        }
    }

    private void registerLocationListener() {
        this.taskRunner.registerListener(new TaskRunnerListener(){

            @Override
            public String getListenerId() {
                return "WorkerTaskManager";
            }

            @Override
            public void locationChanged(String taskId, TaskLocation newLocation) {
                WorkerTaskManager.this.submitNoticeToExec(new LocationNotice(taskId, newLocation));
            }

            @Override
            public void statusChanged(String taskId, TaskStatus status) {
            }
        }, (Executor)Execs.directExecutor());
    }

    private void addRunningTask(final Task task, ListenableFuture<TaskStatus> future) {
        this.runningTasks.put(task.getId(), new TaskDetails(task));
        Futures.addCallback(future, (FutureCallback)new FutureCallback<TaskStatus>(){

            public void onSuccess(TaskStatus result) {
                WorkerTaskManager.this.submitNoticeToExec(new StatusNotice(task, result));
            }

            public void onFailure(Throwable t) {
                WorkerTaskManager.this.submitNoticeToExec(new StatusNotice(task, TaskStatus.failure((String)task.getId())));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void assignTask(Task task) {
        Preconditions.checkState((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), (Object)"not started");
        Object object = this.lock;
        synchronized (object) {
            if (this.assignedTasks.containsKey(task.getId()) || this.runningTasks.containsKey(task.getId()) || this.completedTasks.containsKey(task.getId())) {
                log.warn("Request to assign task[%s] ignored because it exists already.", new Object[]{task.getId()});
                return;
            }
            try {
                FileUtils.writeAtomically((File)new File(this.getAssignedTaskDir(), task.getId()), (File)this.getTmpTaskDir(), os -> {
                    this.jsonMapper.writeValue(os, (Object)task);
                    return null;
                });
                this.assignedTasks.put(task.getId(), task);
            }
            catch (IOException ex) {
                log.error((Throwable)ex, "Error while trying to persist assigned task[%s]", new Object[]{task.getId()});
                throw new ISE("Assign Task[%s] Request failed because [%s].", new Object[]{task.getId(), ex.getMessage()});
            }
            this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.TaskUpdate(TaskAnnouncement.create(task, TaskStatus.running((String)task.getId()), TaskLocation.unknown())));
        }
        this.submitNoticeToExec(new RunNotice(task));
    }

    private File getTmpTaskDir() {
        return new File(this.taskConfig.getBaseTaskDir(), "workerTaskManagerTmp");
    }

    private void cleanupAndMakeTmpTaskDir() {
        File tmpDir = this.getTmpTaskDir();
        tmpDir.mkdirs();
        if (!tmpDir.isDirectory()) {
            throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", new Object[]{tmpDir});
        }
        try {
            org.apache.commons.io.FileUtils.cleanDirectory((File)tmpDir);
        }
        catch (IOException ex) {
            log.warn("Failed to cleanup tmp dir [%s].", new Object[]{tmpDir.getAbsolutePath()});
        }
    }

    public File getAssignedTaskDir() {
        return new File(this.taskConfig.getBaseTaskDir(), "assignedTasks");
    }

    private void initAssignedTasks() {
        File assignedTaskDir = this.getAssignedTaskDir();
        log.debug("Looking for any previously assigned tasks on disk[%s].", new Object[]{assignedTaskDir});
        assignedTaskDir.mkdirs();
        if (!assignedTaskDir.isDirectory()) {
            throw new ISE("Assigned Tasks Dir [%s] does not exist/not-a-directory.", new Object[]{assignedTaskDir});
        }
        for (File taskFile : assignedTaskDir.listFiles()) {
            try {
                String taskId = taskFile.getName();
                Task task = (Task)this.jsonMapper.readValue(taskFile, Task.class);
                if (!taskId.equals(task.getId())) {
                    throw new ISE("WTF! Corrupted assigned task on disk[%s].", new Object[]{taskFile.getAbsoluteFile()});
                }
                this.assignedTasks.put(taskId, task);
            }
            catch (IOException ex) {
                log.noStackTrace().error((Throwable)ex, "Failed to read assigned task from disk at [%s]. Ignored.", new Object[]{taskFile.getAbsoluteFile()});
            }
        }
        if (!this.assignedTasks.isEmpty()) {
            log.info("Found %,d running tasks from previous run: %s", new Object[]{this.assignedTasks.size(), this.assignedTasks.values().stream().map(Task::getId).collect(Collectors.joining(", "))});
        }
        for (Task task : this.assignedTasks.values()) {
            this.submitNoticeToExec(new RunNotice(task));
        }
    }

    private void cleanupAssignedTask(Task task) {
        this.assignedTasks.remove(task.getId());
        File taskFile = new File(this.getAssignedTaskDir(), task.getId());
        try {
            Files.delete(taskFile.toPath());
        }
        catch (IOException ex) {
            log.error((Throwable)ex, "Failed to delete assigned task from disk at [%s].", new Object[]{taskFile});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<ChangeRequestsSnapshot<WorkerHistoryItem>> getChangesSince(ChangeRequestHistory.Counter counter) {
        Preconditions.checkState((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), (Object)"not started");
        if (counter.getCounter() < 0L) {
            Object object = this.lock;
            synchronized (object) {
                ArrayList<WorkerHistoryItem> items = new ArrayList<WorkerHistoryItem>();
                items.add(new WorkerHistoryItem.Metadata(this.disabled.get()));
                for (Task task : this.assignedTasks.values()) {
                    items.add(new WorkerHistoryItem.TaskUpdate(TaskAnnouncement.create(task, TaskStatus.running((String)task.getId()), TaskLocation.unknown())));
                }
                for (TaskDetails details : this.runningTasks.values()) {
                    items.add(new WorkerHistoryItem.TaskUpdate(TaskAnnouncement.create(details.task, details.status, details.location)));
                }
                for (TaskAnnouncement taskAnnouncement : this.completedTasks.values()) {
                    items.add(new WorkerHistoryItem.TaskUpdate(taskAnnouncement));
                }
                SettableFuture future = SettableFuture.create();
                future.set((Object)ChangeRequestsSnapshot.success((ChangeRequestHistory.Counter)this.changeHistory.getLastCounter(), (List)Lists.newArrayList(items)));
                return future;
            }
        }
        return this.changeHistory.getRequestsSince(counter);
    }

    public File getCompletedTaskDir() {
        return new File(this.taskConfig.getBaseTaskDir(), "completedTasks");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void moveFromRunningToCompleted(String taskId, TaskAnnouncement taskAnnouncement) {
        Object object = this.lock;
        synchronized (object) {
            this.runningTasks.remove(taskId);
            this.completedTasks.put(taskId, taskAnnouncement);
            try {
                FileUtils.writeAtomically((File)new File(this.getCompletedTaskDir(), taskId), (File)this.getTmpTaskDir(), os -> {
                    this.jsonMapper.writeValue(os, (Object)taskAnnouncement);
                    return null;
                });
            }
            catch (IOException ex) {
                log.error((Throwable)ex, "Error while trying to persist completed task[%s] announcement.", new Object[]{taskId});
                throw new ISE("Persisting completed task[%s] announcement failed because [%s].", new Object[]{taskId, ex.getMessage()});
            }
        }
    }

    private void initCompletedTasks() {
        File completedTaskDir = this.getCompletedTaskDir();
        log.debug("Looking for any previously completed tasks on disk[%s].", new Object[]{completedTaskDir});
        completedTaskDir.mkdirs();
        if (!completedTaskDir.isDirectory()) {
            throw new ISE("Completed Tasks Dir [%s] does not exist/not-a-directory.", new Object[]{completedTaskDir});
        }
        for (File taskFile : completedTaskDir.listFiles()) {
            try {
                String taskId = taskFile.getName();
                TaskAnnouncement taskAnnouncement2 = (TaskAnnouncement)this.jsonMapper.readValue(taskFile, TaskAnnouncement.class);
                if (!taskId.equals(taskAnnouncement2.getTaskId())) {
                    throw new ISE("WTF! Corrupted completed task on disk[%s].", new Object[]{taskFile.getAbsoluteFile()});
                }
                this.completedTasks.put(taskId, taskAnnouncement2);
            }
            catch (IOException ex) {
                log.error((Throwable)ex, "Failed to read completed task from disk at [%s]. Ignored.", new Object[]{taskFile.getAbsoluteFile()});
            }
        }
        if (!this.completedTasks.isEmpty()) {
            log.info("Found %,d complete tasks from previous run: %s", new Object[]{this.completedTasks.size(), this.completedTasks.values().stream().map(taskAnnouncement -> StringUtils.format((String)"%s (%s)", (Object[])new Object[]{taskAnnouncement.getTaskId(), taskAnnouncement.getStatus()})).collect(Collectors.joining(", "))});
        }
    }

    private void scheduleCompletedTasksCleanup() {
        this.completedTasksCleanupExecutor.scheduleAtFixedRate(() -> {
            try {
                Map taskStatusesFromOverlord;
                ImmutableSet taskIds;
                block13: {
                    if (this.completedTasks.isEmpty()) {
                        log.debug("Skipping completed tasks cleanup. Its empty.", new Object[0]);
                        return;
                    }
                    taskIds = ImmutableSet.copyOf(this.completedTasks.keySet());
                    taskStatusesFromOverlord = null;
                    try {
                        StringFullResponseHolder fullResponseHolder = this.overlordClient.go(this.overlordClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus").setContent(this.jsonMapper.writeValueAsBytes((Object)taskIds)).addHeader("Accept", "application/json").addHeader("Content-Type", "application/json"));
                        if (fullResponseHolder.getStatus().getCode() == 200) {
                            String responseContent = fullResponseHolder.getContent();
                            taskStatusesFromOverlord = (Map)this.jsonMapper.readValue(responseContent, (TypeReference)new TypeReference<Map<String, TaskStatus>>(){});
                            log.debug("Received completed task status response [%s].", new Object[]{responseContent});
                        } else if (fullResponseHolder.getStatus().getCode() == 404) {
                            log.debug("Deleting all completed tasks. Overlord appears to be running on older version.", new Object[0]);
                            taskStatusesFromOverlord = ImmutableMap.of();
                        } else {
                            log.info("Got non-success code[%s] from overlord while getting active tasks. will retry on next scheduled run.", new Object[]{fullResponseHolder.getStatus().getCode()});
                        }
                    }
                    catch (Exception ex) {
                        log.info((Throwable)ex, "Exception while getting active tasks from overlord. will retry on next scheduled run.", new Object[0]);
                        if (!(ex instanceof InterruptedException)) break block13;
                        Thread.currentThread().interrupt();
                    }
                }
                if (taskStatusesFromOverlord == null) {
                    return;
                }
                for (String taskId : taskIds) {
                    TaskStatus status = (TaskStatus)taskStatusesFromOverlord.get(taskId);
                    if (status != null && !status.isComplete()) continue;
                    log.debug("Deleting completed task[%s] information, overlord task status[%s].", new Object[]{taskId, status == null ? "unknown" : status.getStatusCode()});
                    this.completedTasks.remove(taskId);
                    File taskFile = new File(this.getCompletedTaskDir(), taskId);
                    try {
                        Files.deleteIfExists(taskFile.toPath());
                        this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.TaskRemoval(taskId));
                    }
                    catch (IOException ex) {
                        log.error((Throwable)ex, "Failed to delete completed task from disk [%s].", new Object[]{taskFile});
                    }
                }
            }
            catch (Throwable th) {
                log.error(th, "WTF! Got unknown exception while running the scheduled cleanup.", new Object[0]);
            }
        }, 1L, 5L, TimeUnit.MINUTES);
    }

    public void workerEnabled() {
        Preconditions.checkState((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), (Object)"not started");
        if (this.disabled.compareAndSet(true, false)) {
            this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.Metadata(false));
        }
    }

    public void workerDisabled() {
        Preconditions.checkState((boolean)this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS), (Object)"not started");
        if (this.disabled.compareAndSet(false, true)) {
            this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.Metadata(true));
        }
    }

    protected abstract void taskStarted(String var1);

    protected abstract void taskAnnouncementChanged(TaskAnnouncement var1);

    private class LocationNotice
    implements Notice {
        private final String taskId;
        private final TaskLocation location;

        public LocationNotice(String taskId, TaskLocation location) {
            this.taskId = taskId;
            this.location = location;
        }

        @Override
        public String getTaskId() {
            return this.taskId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle() {
            Object object = WorkerTaskManager.this.lock;
            synchronized (object) {
                TaskDetails details = (TaskDetails)WorkerTaskManager.this.runningTasks.get(this.taskId);
                if (details == null) {
                    log.warn("Got location notice for task [%s] that isn't running...", new Object[]{this.taskId});
                    return;
                }
                if (!Objects.equals(details.location, this.location)) {
                    details.location = this.location;
                    TaskAnnouncement latest = TaskAnnouncement.create(details.task, details.status, details.location);
                    WorkerTaskManager.this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.TaskUpdate(latest));
                    WorkerTaskManager.this.taskAnnouncementChanged(latest);
                }
            }
        }
    }

    private class StatusNotice
    implements Notice {
        private final Task task;
        private final TaskStatus status;

        public StatusNotice(Task task, TaskStatus status) {
            this.task = task;
            this.status = status;
        }

        @Override
        public String getTaskId() {
            return this.task.getId();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle() {
            Object object = WorkerTaskManager.this.lock;
            synchronized (object) {
                TaskDetails details = (TaskDetails)WorkerTaskManager.this.runningTasks.get(this.task.getId());
                if (details == null) {
                    log.warn("Got status notice for task [%s] that isn't running...", new Object[]{this.task.getId()});
                    return;
                }
                if (!this.status.isComplete()) {
                    log.warn("WTF?! Got status notice for task [%s] that isn't complete (status = [%s])...", new Object[]{this.task.getId(), this.status.getStatusCode()});
                    return;
                }
                details.status = this.status.withDuration(System.currentTimeMillis() - details.startTime);
                TaskAnnouncement latest = TaskAnnouncement.create(details.task, details.status, details.location);
                WorkerTaskManager.this.moveFromRunningToCompleted(this.task.getId(), latest);
                WorkerTaskManager.this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.TaskUpdate(latest));
                WorkerTaskManager.this.taskAnnouncementChanged(latest);
                log.info("Task [%s] completed with status [%s].", new Object[]{this.task.getId(), this.status.getStatusCode()});
            }
        }
    }

    private class RunNotice
    implements Notice {
        private final Task task;

        public RunNotice(Task task) {
            this.task = task;
        }

        @Override
        public String getTaskId() {
            return this.task.getId();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle() {
            TaskAnnouncement announcement;
            Object object = WorkerTaskManager.this.lock;
            synchronized (object) {
                if (WorkerTaskManager.this.runningTasks.containsKey(this.task.getId()) || WorkerTaskManager.this.completedTasks.containsKey(this.task.getId())) {
                    log.warn("Got run notice for task [%s] that I am already running or completed...", new Object[]{this.task.getId()});
                    WorkerTaskManager.this.taskStarted(this.task.getId());
                    return;
                }
                ListenableFuture<TaskStatus> future = WorkerTaskManager.this.taskRunner.run(this.task);
                WorkerTaskManager.this.addRunningTask(this.task, (ListenableFuture<TaskStatus>)future);
                announcement = TaskAnnouncement.create(this.task, TaskStatus.running((String)this.task.getId()), TaskLocation.unknown());
                WorkerTaskManager.this.changeHistory.addChangeRequest((Object)new WorkerHistoryItem.TaskUpdate(announcement));
                WorkerTaskManager.this.cleanupAssignedTask(this.task);
                log.info("Task[%s] started.", new Object[]{this.task.getId()});
            }
            WorkerTaskManager.this.taskAnnouncementChanged(announcement);
            WorkerTaskManager.this.taskStarted(this.task.getId());
        }
    }

    private static interface Notice {
        public String getTaskId();

        public void handle();
    }

    private static class TaskDetails {
        private final Task task;
        private final long startTime;
        private TaskStatus status;
        private TaskLocation location;

        public TaskDetails(Task task) {
            this.task = task;
            this.startTime = System.currentTimeMillis();
            this.status = TaskStatus.running((String)task.getId());
            this.location = TaskLocation.unknown();
        }
    }
}

