/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexingPhaseProgress;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;

public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTaskReportType extends SubTaskReport>
implements ParallelIndexTaskRunner<SubTaskType, SubTaskReportType> {
    private static final Logger LOG = new Logger(ParallelIndexPhaseRunner.class);
    private final TaskToolbox toolbox;
    private final String taskId;
    private final String groupId;
    private final ParallelIndexTuningConfig tuningConfig;
    private final Map<String, Object> context;
    private final int maxNumConcurrentSubTasks;
    private final IndexingServiceClient indexingServiceClient;
    private final BlockingQueue<TaskMonitor.SubTaskCompleteEvent<SubTaskType>> taskCompleteEvents = new LinkedBlockingDeque<TaskMonitor.SubTaskCompleteEvent<SubTaskType>>();
    private final ConcurrentHashMap<String, SubTaskReportType> reportsMap = new ConcurrentHashMap();
    private volatile boolean subTaskScheduleAndMonitorStopped;
    private volatile TaskMonitor<SubTaskType> taskMonitor;
    private int nextSpecId = 0;

    ParallelIndexPhaseRunner(TaskToolbox toolbox, String taskId, String groupId, ParallelIndexTuningConfig tuningConfig, Map<String, Object> context, IndexingServiceClient indexingServiceClient) {
        this.toolbox = toolbox;
        this.taskId = taskId;
        this.groupId = groupId;
        this.tuningConfig = tuningConfig;
        this.context = context;
        this.maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
        this.indexingServiceClient = (IndexingServiceClient)Preconditions.checkNotNull((Object)indexingServiceClient, (Object)"indexingServiceClient");
    }

    abstract Iterator<SubTaskSpec<SubTaskType>> subTaskSpecIterator() throws IOException;

    abstract int estimateTotalNumSubTasks() throws IOException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskState run() throws Exception {
        CountingSubTaskSpecIterator subTaskSpecIterator = new CountingSubTaskSpecIterator(this.subTaskSpecIterator());
        if (!subTaskSpecIterator.hasNext()) {
            LOG.warn("There's no input split to process", new Object[0]);
            return TaskState.SUCCESS;
        }
        long taskStatusCheckingPeriod = this.tuningConfig.getTaskStatusCheckPeriodMs();
        this.taskMonitor = new TaskMonitor((IndexingServiceClient)Preconditions.checkNotNull((Object)this.indexingServiceClient, (Object)"indexingServiceClient"), this.tuningConfig.getMaxRetry(), this.estimateTotalNumSubTasks());
        TaskState state = TaskState.RUNNING;
        this.taskMonitor.start(taskStatusCheckingPeriod);
        try {
            LOG.info("Submitting initial tasks", new Object[0]);
            while (this.isRunning() && subTaskSpecIterator.hasNext() && this.taskMonitor.getNumRunningTasks() < this.maxNumConcurrentSubTasks) {
                this.submitNewTask(this.taskMonitor, (SubTaskSpec<SubTaskType>)subTaskSpecIterator.next());
            }
            LOG.info("Waiting for subTasks to be completed", new Object[0]);
            block8: while (this.isRunning()) {
                TaskMonitor.SubTaskCompleteEvent<SubTaskType> taskCompleteEvent = this.taskCompleteEvents.poll(taskStatusCheckingPeriod, TimeUnit.MILLISECONDS);
                if (taskCompleteEvent == null) continue;
                TaskState completeState = taskCompleteEvent.getLastState();
                switch (completeState) {
                    case SUCCESS: {
                        TaskStatusPlus completeStatus = taskCompleteEvent.getLastStatus();
                        if (completeStatus == null) {
                            throw new ISE("Last status of complete task is missing!", new Object[0]);
                        }
                        if (!this.reportsMap.containsKey(completeStatus.getId())) {
                            throw new ISE("Missing reports from task[%s]!", new Object[]{completeStatus.getId()});
                        }
                        if (!subTaskSpecIterator.hasNext()) {
                            if (this.taskMonitor.getNumRunningTasks() != 0 || !this.taskCompleteEvents.isEmpty()) continue block8;
                            this.subTaskScheduleAndMonitorStopped = true;
                            if (subTaskSpecIterator.count == this.taskMonitor.getNumSucceededTasks()) {
                                state = TaskState.SUCCESS;
                                continue block8;
                            }
                            ParallelIndexingPhaseProgress monitorStatus = this.taskMonitor.getProgress();
                            throw new ISE("Expected [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks", new Object[]{subTaskSpecIterator.count, monitorStatus.getSucceeded(), monitorStatus.getFailed()});
                        }
                        if (this.taskMonitor.getNumRunningTasks() >= this.maxNumConcurrentSubTasks) continue block8;
                        this.submitNewTask(this.taskMonitor, (SubTaskSpec<SubTaskType>)subTaskSpecIterator.next());
                        continue block8;
                    }
                    case FAILED: {
                        state = TaskState.FAILED;
                        this.subTaskScheduleAndMonitorStopped = true;
                        TaskStatusPlus lastStatus = taskCompleteEvent.getLastStatus();
                        if (lastStatus != null) {
                            LOG.error("Failed because of the failed sub task[%s]", new Object[]{lastStatus.getId()});
                            continue block8;
                        }
                        SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec)taskCompleteEvent.getSpec();
                        InputRowParser inputRowParser = spec.getIngestionSpec().getDataSchema().getParser();
                        LOG.error("Failed to run sub tasks for inputSplits[%s]", new Object[]{ParallelIndexPhaseRunner.getSplitsIfSplittable(spec.getIngestionSpec().getIOConfig().getNonNullInputSource(inputRowParser), spec.getIngestionSpec().getIOConfig().getNonNullInputFormat(inputRowParser == null ? null : inputRowParser.getParseSpec()), this.tuningConfig.getSplitHintSpec())});
                        continue block8;
                    }
                }
                throw new ISE("spec[%s] is in an invalid state[%s]", new Object[]{taskCompleteEvent.getSpec().getId(), completeState});
            }
        }
        finally {
            this.stopInternal();
            if (!state.isComplete()) {
                state = TaskState.FAILED;
            }
        }
        return state;
    }

    private boolean isRunning() {
        return !this.subTaskScheduleAndMonitorStopped && !Thread.currentThread().isInterrupted();
    }

    private void submitNewTask(TaskMonitor<SubTaskType> taskMonitor, final SubTaskSpec<SubTaskType> spec) {
        LOG.info("Submit a new task for spec[%s] and inputSplit[%s]", new Object[]{spec.getId(), spec.getInputSplit()});
        ListenableFuture<TaskMonitor.SubTaskCompleteEvent<SubTaskType>> future = taskMonitor.submit(spec);
        Futures.addCallback(future, (FutureCallback)new FutureCallback<TaskMonitor.SubTaskCompleteEvent<SubTaskType>>(){

            public void onSuccess(TaskMonitor.SubTaskCompleteEvent<SubTaskType> completeEvent) {
                ParallelIndexPhaseRunner.this.taskCompleteEvents.offer(completeEvent);
            }

            public void onFailure(Throwable t) {
                LOG.error(t, "Error while running a task for subTaskSpec[%s]", new Object[]{spec});
                ParallelIndexPhaseRunner.this.taskCompleteEvents.offer(TaskMonitor.SubTaskCompleteEvent.fail(spec, t));
            }
        });
    }

    private static List<InputSplit> getSplitsIfSplittable(InputSource inputSource, InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException {
        if (inputSource instanceof SplittableInputSource) {
            SplittableInputSource splittableInputSource = (SplittableInputSource)inputSource;
            return splittableInputSource.createSplits(inputFormat, splitHintSpec).collect(Collectors.toList());
        }
        throw new ISE("inputSource[%s] is not splittable", new Object[]{inputSource.getClass().getSimpleName()});
    }

    @Override
    public void stopGracefully() {
        this.subTaskScheduleAndMonitorStopped = true;
        this.stopInternal();
    }

    private void stopInternal() {
        LOG.info("Cleaning up resources", new Object[0]);
        this.taskCompleteEvents.clear();
        if (this.taskMonitor != null) {
            this.taskMonitor.stop();
        }
    }

    @Override
    public void collectReport(SubTaskReportType report) {
        this.reportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
            if (prevReport != null) {
                Preconditions.checkState((boolean)prevReport.equals(report), (String)"task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]", (Object[])new Object[]{taskId, prevReport, report});
            }
            return report;
        });
    }

    @Override
    public Map<String, SubTaskReportType> getReports() {
        return this.reportsMap;
    }

    @Override
    public ParallelIndexingPhaseProgress getProgress() {
        return this.taskMonitor == null ? ParallelIndexingPhaseProgress.notRunning() : this.taskMonitor.getProgress();
    }

    @Override
    public Set<String> getRunningTaskIds() {
        return this.taskMonitor == null ? Collections.emptySet() : this.taskMonitor.getRunningTaskIds();
    }

    @Override
    public List<SubTaskSpec<SubTaskType>> getSubTaskSpecs() {
        if (this.taskMonitor != null) {
            List<SubTaskSpec<SubTaskType>> runningSubTaskSpecs = this.taskMonitor.getRunningSubTaskSpecs();
            List<SubTaskSpec<SubTaskType>> completeSubTaskSpecs = this.taskMonitor.getCompleteSubTaskSpecs();
            HashMap subTaskSpecMap = new HashMap(runningSubTaskSpecs.size() + completeSubTaskSpecs.size());
            runningSubTaskSpecs.forEach(spec -> subTaskSpecMap.put(spec.getId(), spec));
            completeSubTaskSpecs.forEach(spec -> subTaskSpecMap.put(spec.getId(), spec));
            return new ArrayList<SubTaskSpec<SubTaskType>>(subTaskSpecMap.values());
        }
        return Collections.emptyList();
    }

    @Override
    public List<SubTaskSpec<SubTaskType>> getRunningSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getRunningSubTaskSpecs();
    }

    @Override
    public List<SubTaskSpec<SubTaskType>> getCompleteSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getCompleteSubTaskSpecs();
    }

    @Override
    @Nullable
    public SubTaskSpec<SubTaskType> getSubTaskSpec(String subTaskSpecId) {
        if (this.taskMonitor != null) {
            TaskMonitor.MonitorEntry monitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(subTaskSpecId);
            TaskHistory<SubTaskType> taskHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
            SubTaskSpec<Object> subTaskSpec = monitorEntry != null ? monitorEntry.getSpec() : (taskHistory != null ? taskHistory.getSpec() : null);
            return subTaskSpec;
        }
        return null;
    }

    @Override
    @Nullable
    public ParallelIndexTaskRunner.SubTaskSpecStatus getSubTaskState(String subTaskSpecId) {
        if (this.taskMonitor == null) {
            return null;
        }
        TaskMonitor.MonitorEntry monitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(subTaskSpecId);
        TaskHistory<SubTaskType> taskHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskSpecStatus = monitorEntry != null ? new ParallelIndexTaskRunner.SubTaskSpecStatus((SinglePhaseSubTaskSpec)monitorEntry.getSpec(), monitorEntry.getRunningStatus(), monitorEntry.getTaskHistory()) : (taskHistory != null && !taskHistory.isEmpty() ? new ParallelIndexTaskRunner.SubTaskSpecStatus((SinglePhaseSubTaskSpec)taskHistory.getSpec(), null, taskHistory.getAttemptHistory()) : null);
        return subTaskSpecStatus;
    }

    @Override
    @Nullable
    public TaskHistory<SubTaskType> getCompleteSubTaskSpecAttemptHistory(String subTaskSpecId) {
        if (this.taskMonitor == null) {
            return null;
        }
        return this.taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
    }

    String getTaskId() {
        return this.taskId;
    }

    String getGroupId() {
        return this.groupId;
    }

    Map<String, Object> getContext() {
        return this.context;
    }

    ParallelIndexTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @VisibleForTesting
    TaskToolbox getToolbox() {
        return this.toolbox;
    }

    @Nullable
    @VisibleForTesting
    TaskMonitor<SubTaskType> getTaskMonitor() {
        return this.taskMonitor;
    }

    @VisibleForTesting
    int getAndIncrementNextSpecId() {
        return this.nextSpecId++;
    }

    @VisibleForTesting
    IndexingServiceClient getIndexingServiceClient() {
        return this.indexingServiceClient;
    }

    private class CountingSubTaskSpecIterator
    implements Iterator<SubTaskSpec<SubTaskType>> {
        private final Iterator<SubTaskSpec<SubTaskType>> delegate;
        private int count;

        private CountingSubTaskSpecIterator(Iterator<SubTaskSpec<SubTaskType>> delegate) {
            this.delegate = delegate;
        }

        @Override
        public boolean hasNext() {
            return this.delegate.hasNext();
        }

        @Override
        public SubTaskSpec<SubTaskType> next() {
            if (!this.delegate.hasNext()) {
                throw new NoSuchElementException();
            }
            ++this.count;
            return this.delegate.next();
        }
    }
}

