/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.DimensionDistributionReport;
import org.apache.druid.indexing.common.task.batch.parallel.GeneratedHashPartitionsReport;
import org.apache.druid.indexing.common.task.batch.parallel.GeneratedPartitionsReport;
import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
import org.apache.druid.indexing.common.task.batch.parallel.HashPartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.HashPartitionStat;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentMergeIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentMergeParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.PartitionStat;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskSpec;
import org.apache.druid.indexing.common.task.batch.parallel.TaskHistory;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.PartitionBoundaries;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;

public class ParallelIndexSupervisorTask
extends AbstractBatchIndexTask
implements ChatHandler {
    public static final String TYPE = "index_parallel";
    private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
    private final ParallelIndexIngestionSpec ingestionSchema;
    private final InputSource baseInputSource;
    private final IndexingServiceClient indexingServiceClient;
    private final ChatHandlerProvider chatHandlerProvider;
    private final AuthorizerMapper authorizerMapper;
    private final RowIngestionMetersFactory rowIngestionMetersFactory;
    private final AppenderatorsManager appenderatorsManager;
    private final boolean missingIntervalsInOverwriteMode;
    private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap();
    private volatile @MonotonicNonNull CurrentSubTaskHolder currentSubTaskHolder;
    private volatile @MonotonicNonNull TaskToolbox toolbox;

    @JsonCreator
    public ParallelIndexSupervisorTask(@JsonProperty(value="id") String id, @JsonProperty(value="groupId") @Nullable String groupId, @JsonProperty(value="resource") TaskResource taskResource, @JsonProperty(value="spec") ParallelIndexIngestionSpec ingestionSchema, @JsonProperty(value="context") Map<String, Object> context, @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject AppenderatorsManager appenderatorsManager) {
        super(ParallelIndexSupervisorTask.getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), groupId, taskResource, ingestionSchema.getDataSchema().getDataSource(), context);
        this.ingestionSchema = ingestionSchema;
        if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
            ParallelIndexSupervisorTask.checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
            if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
                throw new ISE("forceGuaranteedRollup is set but intervals is missing in granularitySpec", new Object[0]);
            }
        }
        this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(ingestionSchema.getDataSchema().getParser());
        this.indexingServiceClient = indexingServiceClient;
        this.chatHandlerProvider = chatHandlerProvider;
        this.authorizerMapper = authorizerMapper;
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        this.appenderatorsManager = appenderatorsManager;
        boolean bl = this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() && !ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
        if (this.missingIntervalsInOverwriteMode) {
            this.addToContext("forceTimeChunkLock", true);
        }
    }

    private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec) {
        if (!partitionsSpec.isForceGuaranteedRollupCompatible()) {
            String incompatibiltyMsg = partitionsSpec.getForceGuaranteedRollupIncompatiblityReason();
            String msg = "forceGuaranteedRollup is incompatible with partitionsSpec: " + incompatibiltyMsg;
            throw new ISE(msg, new Object[0]);
        }
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @JsonProperty(value="spec")
    public ParallelIndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Nullable
    @VisibleForTesting
    ParallelIndexTaskRunner getCurrentRunner() {
        if (this.isParallelMode()) {
            return this.currentSubTaskHolder == null ? null : (ParallelIndexTaskRunner)this.currentSubTaskHolder.getTask();
        }
        return null;
    }

    @VisibleForTesting
    IndexingServiceClient getIndexingServiceClient() {
        return this.indexingServiceClient;
    }

    @Nullable
    private <T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(TaskToolbox toolbox, Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator) {
        ParallelIndexTaskRunner<T, R> newRunner = runnerCreator.apply(toolbox);
        if (this.currentSubTaskHolder.setTask(newRunner)) {
            return newRunner;
        }
        return null;
    }

    private static TaskState runNextPhase(@Nullable ParallelIndexTaskRunner nextPhaseRunner) throws Exception {
        if (nextPhaseRunner == null) {
            LOG.info("Task is asked to stop. Finish as failed", new Object[0]);
            return TaskState.FAILED;
        }
        return nextPhaseRunner.run();
    }

    @VisibleForTesting
    SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox) {
        return new SinglePhaseParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.ingestionSchema, this.getContext(), this.indexingServiceClient);
    }

    @VisibleForTesting
    PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(TaskToolbox toolbox) {
        return new PartialHashSegmentGenerateParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.ingestionSchema, this.getContext(), this.indexingServiceClient);
    }

    @VisibleForTesting
    PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox toolbox) {
        return new PartialDimensionDistributionParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.ingestionSchema, this.getContext(), this.indexingServiceClient);
    }

    @VisibleForTesting
    PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(TaskToolbox toolbox, Map<Interval, PartitionBoundaries> intervalToPartitions) {
        return new PartialRangeSegmentGenerateParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.ingestionSchema, this.getContext(), this.indexingServiceClient, intervalToPartitions);
    }

    @VisibleForTesting
    PartialHashSegmentMergeParallelIndexTaskRunner createPartialHashSegmentMergeRunner(TaskToolbox toolbox, List<PartialHashSegmentMergeIOConfig> ioConfigs) {
        return new PartialHashSegmentMergeParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.getIngestionSchema().getDataSchema(), ioConfigs, this.getIngestionSchema().getTuningConfig(), this.getContext(), this.indexingServiceClient);
    }

    @VisibleForTesting
    PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(TaskToolbox toolbox, List<PartialGenericSegmentMergeIOConfig> ioConfigs) {
        return new PartialGenericSegmentMergeParallelIndexTaskRunner(toolbox, this.getId(), this.getGroupId(), this.getIngestionSchema().getDataSchema(), ioConfigs, this.getIngestionSchema().getTuningConfig(), this.getContext(), this.indexingServiceClient);
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return this.determineLockGranularityAndTryLock(taskActionClient, this.ingestionSchema.getDataSchema().getGranularitySpec());
    }

    @Override
    public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals) throws IOException {
        return ParallelIndexSupervisorTask.findInputSegments(this.getDataSource(), taskActionClient, intervals, this.ingestionSchema.getIOConfig().getFirehoseFactory());
    }

    @Override
    public boolean requireLockExistingSegments() {
        return !this.ingestionSchema.getIOConfig().isAppendToExisting();
    }

    @Override
    public boolean isPerfectRollup() {
        return ParallelIndexSupervisorTask.isGuaranteedRollup(this.getIngestionSchema().getIOConfig(), this.getIngestionSchema().getTuningConfig());
    }

    @Override
    @Nullable
    public Granularity getSegmentGranularity() {
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        if (granularitySpec instanceof ArbitraryGranularitySpec) {
            return null;
        }
        return granularitySpec.getSegmentGranularity();
    }

    @Override
    public TaskStatus runTask(TaskToolbox toolbox) throws Exception {
        if (this.ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() != 0) {
            LOG.warn("maxSavedParseExceptions is not supported yet", new Object[0]);
        }
        if (this.ingestionSchema.getTuningConfig().getMaxParseExceptions() != Integer.MAX_VALUE) {
            LOG.warn("maxParseExceptions is not supported yet", new Object[0]);
        }
        if (this.ingestionSchema.getTuningConfig().isLogParseExceptions()) {
            LOG.warn("logParseExceptions is not supported yet", new Object[0]);
        }
        if (this.missingIntervalsInOverwriteMode) {
            LOG.warn("Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. Forced to use timeChunk lock.", new Object[0]);
        }
        LOG.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider)Preconditions.checkNotNull((Object)this.chatHandlerProvider, (Object)"chatHandlerProvider")).getClass().getName()});
        this.chatHandlerProvider.register(this.getId(), (ChatHandler)this, false);
        try {
            this.initializeSubTaskCleaner();
            if (this.isParallelMode()) {
                this.toolbox = toolbox;
                if (this.getIngestionSchema().getTuningConfig().isForceGuaranteedRollup()) {
                    TaskStatus taskStatus = this.runMultiPhaseParallel(toolbox);
                    return taskStatus;
                }
                TaskStatus taskStatus = this.runSinglePhaseParallel(toolbox);
                return taskStatus;
            }
            if (!this.baseInputSource.isSplittable()) {
                LOG.warn("firehoseFactory[%s] is not splittable. Running sequentially.", new Object[]{this.baseInputSource.getClass().getSimpleName()});
            } else if (this.ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) {
                LOG.warn("maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode.", new Object[]{this.ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks()});
            } else {
                throw new ISE("Unknown reason for sequentail mode. Failing this task.", new Object[0]);
            }
            TaskStatus taskStatus = this.runSequential(toolbox);
            return taskStatus;
        }
        finally {
            this.chatHandlerProvider.unregister(this.getId());
        }
    }

    private void initializeSubTaskCleaner() {
        this.currentSubTaskHolder = this.isParallelMode() ? new CurrentSubTaskHolder((currentRunnerObject, taskConfig) -> {
            ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner)currentRunnerObject;
            runner.stopGracefully();
        }) : new CurrentSubTaskHolder((taskObject, taskConfig) -> {
            IndexTask task = (IndexTask)taskObject;
            task.stopGracefully((TaskConfig)taskConfig);
        });
        this.registerResourceCloserOnAbnormalExit(this.currentSubTaskHolder);
    }

    private boolean isParallelMode() {
        int minRequiredNumConcurrentSubTasks = this.useRangePartitions() ? 1 : 2;
        return this.baseInputSource.isSplittable() && this.ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
    }

    private boolean useRangePartitions() {
        return this.ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
    }

    private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception {
        ParallelIndexTaskRunner runner = this.createRunner(toolbox, this::createSinglePhaseTaskRunner);
        TaskState state = ParallelIndexSupervisorTask.runNextPhase(runner);
        if (state.isSuccess()) {
            ParallelIndexSupervisorTask.publishSegments(toolbox, runner.getReports());
        }
        return TaskStatus.fromCode((String)this.getId(), (TaskState)state);
    }

    private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception {
        return this.useRangePartitions() ? this.runRangePartitionMultiPhaseParallel(toolbox) : this.runHashPartitionMultiPhaseParallel(toolbox);
    }

    private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception {
        ParallelIndexTaskRunner indexingRunner = this.createRunner(toolbox, this::createPartialHashSegmentGenerateRunner);
        TaskState state = ParallelIndexSupervisorTask.runNextPhase(indexingRunner);
        if (state.isFailure()) {
            return TaskStatus.failure((String)this.getId());
        }
        Map<Pair<Interval, Integer>, List<HashPartitionLocation>> partitionToLocations = ParallelIndexSupervisorTask.groupHashPartitionLocationsPerPartition(indexingRunner.getReports());
        List<PartialHashSegmentMergeIOConfig> ioConfigs = ParallelIndexSupervisorTask.createHashMergeIOConfigs(this.ingestionSchema.getTuningConfig().getTotalNumMergeTasks(), partitionToLocations);
        ParallelIndexTaskRunner mergeRunner = this.createRunner(toolbox, tb -> this.createPartialHashSegmentMergeRunner((TaskToolbox)tb, ioConfigs));
        state = ParallelIndexSupervisorTask.runNextPhase(mergeRunner);
        if (state.isSuccess()) {
            ParallelIndexSupervisorTask.publishSegments(toolbox, mergeRunner.getReports());
        }
        return TaskStatus.fromCode((String)this.getId(), (TaskState)state);
    }

    private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception {
        ParallelIndexTaskRunner distributionRunner = this.createRunner(toolbox, this::createPartialDimensionDistributionRunner);
        TaskState distributionState = ParallelIndexSupervisorTask.runNextPhase(distributionRunner);
        if (distributionState.isFailure()) {
            return TaskStatus.failure((String)this.getId(), (String)"partial_dimension_distribution failed");
        }
        Map<Interval, PartitionBoundaries> intervalToPartitions = this.determineAllRangePartitions(distributionRunner.getReports().values());
        if (intervalToPartitions.isEmpty()) {
            String msg = "No valid rows for single dimension partitioning. All rows may have invalid timestamps or multiple dimension values.";
            LOG.warn(msg, new Object[0]);
            return TaskStatus.success((String)this.getId(), (String)msg);
        }
        ParallelIndexTaskRunner indexingRunner = this.createRunner(toolbox, tb -> this.createPartialRangeSegmentGenerateRunner((TaskToolbox)tb, intervalToPartitions));
        TaskState indexingState = ParallelIndexSupervisorTask.runNextPhase(indexingRunner);
        if (indexingState.isFailure()) {
            return TaskStatus.failure((String)this.getId(), (String)"partial_range_index_generate failed");
        }
        Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations = ParallelIndexSupervisorTask.groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
        List<PartialGenericSegmentMergeIOConfig> ioConfigs = ParallelIndexSupervisorTask.createGenericMergeIOConfigs(this.ingestionSchema.getTuningConfig().getTotalNumMergeTasks(), partitionToLocations);
        ParallelIndexTaskRunner mergeRunner = this.createRunner(toolbox, tb -> this.createPartialGenericSegmentMergeRunner((TaskToolbox)tb, ioConfigs));
        TaskState mergeState = ParallelIndexSupervisorTask.runNextPhase(mergeRunner);
        if (mergeState.isSuccess()) {
            ParallelIndexSupervisorTask.publishSegments(toolbox, mergeRunner.getReports());
        }
        return TaskStatus.fromCode((String)this.getId(), (TaskState)mergeState);
    }

    private Map<Interval, PartitionBoundaries> determineAllRangePartitions(Collection<DimensionDistributionReport> reports) {
        ArrayListMultimap intervalToDistributions = ArrayListMultimap.create();
        reports.forEach(arg_0 -> ParallelIndexSupervisorTask.lambda$determineAllRangePartitions$5((Multimap)intervalToDistributions, arg_0));
        return CollectionUtils.mapValues((Map)intervalToDistributions.asMap(), this::determineRangePartition);
    }

    private PartitionBoundaries determineRangePartition(Collection<StringDistribution> distributions) {
        StringSketchMerger distributionMerger = new StringSketchMerger();
        distributions.forEach(distributionMerger::merge);
        StringDistribution mergedDistribution = distributionMerger.getResult();
        SingleDimensionPartitionsSpec partitionsSpec = (SingleDimensionPartitionsSpec)this.ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec();
        Integer targetRowsPerSegment = partitionsSpec.getTargetRowsPerSegment();
        PartitionBoundaries partitions = targetRowsPerSegment == null ? mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment()) : mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
        return partitions;
    }

    private static Map<Pair<Interval, Integer>, List<HashPartitionLocation>> groupHashPartitionLocationsPerPartition(Map<String, GeneratedHashPartitionsReport> subTaskIdToReport) {
        BiFunction<String, HashPartitionStat, HashPartitionLocation> createPartitionLocationFunction = (subtaskId, partitionStat) -> new HashPartitionLocation(partitionStat.getTaskExecutorHost(), partitionStat.getTaskExecutorPort(), partitionStat.isUseHttps(), (String)subtaskId, partitionStat.getInterval(), partitionStat.getSecondaryPartition());
        return ParallelIndexSupervisorTask.groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
    }

    private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> groupGenericPartitionLocationsPerPartition(Map<String, GeneratedPartitionsReport<GenericPartitionStat>> subTaskIdToReport) {
        BiFunction<String, GenericPartitionStat, GenericPartitionLocation> createPartitionLocationFunction = (subtaskId, partitionStat) -> new GenericPartitionLocation(partitionStat.getTaskExecutorHost(), partitionStat.getTaskExecutorPort(), partitionStat.isUseHttps(), (String)subtaskId, partitionStat.getInterval(), partitionStat.getSecondaryPartition());
        return ParallelIndexSupervisorTask.groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
    }

    private static <S extends PartitionStat, L extends PartitionLocation> Map<Pair<Interval, Integer>, List<L>> groupPartitionLocationsPerPartition(Map<String, ? extends GeneratedPartitionsReport<S>> subTaskIdToReport, BiFunction<String, S, L> createPartitionLocationFunction) {
        HashMap<Pair<Interval, Integer>, List<L>> partitionToLocations = new HashMap<Pair<Interval, Integer>, List<L>>();
        for (Map.Entry<String, GeneratedPartitionsReport<S>> entry : subTaskIdToReport.entrySet()) {
            String subTaskId = entry.getKey();
            GeneratedPartitionsReport<S> report = entry.getValue();
            for (PartitionStat partitionStat : report.getPartitionStats()) {
                List locationsOfSamePartition = partitionToLocations.computeIfAbsent((Pair<Interval, Integer>)Pair.of((Object)partitionStat.getInterval(), (Object)partitionStat.getPartitionId()), k -> new ArrayList());
                locationsOfSamePartition.add(createPartitionLocationFunction.apply(subTaskId, partitionStat));
            }
        }
        return partitionToLocations;
    }

    private static List<PartialHashSegmentMergeIOConfig> createHashMergeIOConfigs(int totalNumMergeTasks, Map<Pair<Interval, Integer>, List<HashPartitionLocation>> partitionToLocations) {
        return ParallelIndexSupervisorTask.createMergeIOConfigs(totalNumMergeTasks, partitionToLocations, PartialHashSegmentMergeIOConfig::new);
    }

    private static List<PartialGenericSegmentMergeIOConfig> createGenericMergeIOConfigs(int totalNumMergeTasks, Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations) {
        return ParallelIndexSupervisorTask.createMergeIOConfigs(totalNumMergeTasks, partitionToLocations, PartialGenericSegmentMergeIOConfig::new);
    }

    private static <M extends PartialSegmentMergeIOConfig, L extends PartitionLocation> List<M> createMergeIOConfigs(int totalNumMergeTasks, Map<Pair<Interval, Integer>, List<L>> partitionToLocations, Function<List<L>, M> createPartialSegmentMergeIOConfig) {
        int numMergeTasks = Math.min(totalNumMergeTasks, partitionToLocations.size());
        LOG.info("Number of merge tasks is set to [%d] based on totalNumMergeTasks[%d] and number of partitions[%d]", new Object[]{numMergeTasks, totalNumMergeTasks, partitionToLocations.size()});
        ArrayList<Pair<Interval, Integer>> partitions = new ArrayList<Pair<Interval, Integer>>(partitionToLocations.keySet());
        Collections.shuffle(partitions, ThreadLocalRandom.current());
        int numPartitionsPerTask = (int)Math.round((double)partitions.size() / (double)numMergeTasks);
        ArrayList<M> assignedPartitionLocations = new ArrayList<M>(numMergeTasks);
        for (int i = 0; i < numMergeTasks - 1; ++i) {
            List assignedToSameTask = partitions.subList(i * numPartitionsPerTask, (i + 1) * numPartitionsPerTask).stream().flatMap(intervalAndPartitionId -> ((List)partitionToLocations.get(intervalAndPartitionId)).stream()).collect(Collectors.toList());
            assignedPartitionLocations.add(createPartialSegmentMergeIOConfig.apply(assignedToSameTask));
        }
        List assignedToSameTask = partitions.subList((numMergeTasks - 1) * numPartitionsPerTask, partitions.size()).stream().flatMap(intervalAndPartitionId -> ((List)partitionToLocations.get(intervalAndPartitionId)).stream()).collect(Collectors.toList());
        assignedPartitionLocations.add(createPartialSegmentMergeIOConfig.apply(assignedToSameTask));
        return assignedPartitionLocations;
    }

    private static void publishSegments(TaskToolbox toolbox, Map<String, PushedSegmentsReport> reportsMap) throws IOException {
        boolean published;
        ActionBasedUsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient());
        HashSet oldSegments = new HashSet();
        HashSet newSegments = new HashSet();
        reportsMap.values().forEach(report -> {
            oldSegments.addAll(report.getOldSegments());
            newSegments.addAll(report.getNewSegments());
        });
        TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
        boolean bl = published = newSegments.isEmpty() || publisher.publishSegments(oldSegments, newSegments, null).isSuccess();
        if (published) {
            LOG.info("Published [%d] segments", new Object[]{newSegments.size()});
        } else {
            LOG.info("Transaction failure while publishing segments, checking if someone else beat us to it.", new Object[0]);
            Set segmentsIdentifiers = reportsMap.values().stream().flatMap(report -> report.getNewSegments().stream()).map(SegmentIdWithShardSpec::fromDataSegment).collect(Collectors.toSet());
            if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(newSegments)) {
                LOG.info("Our segments really do exist, awaiting handoff.", new Object[0]);
            } else {
                throw new ISE("Failed to publish segments[%s]", new Object[]{newSegments});
            }
        }
    }

    private TaskStatus runSequential(TaskToolbox toolbox) throws Exception {
        IndexTask indexTask = new IndexTask(this.getId(), this.getGroupId(), this.getTaskResource(), this.getDataSource(), new IndexTask.IndexIngestionSpec(this.getIngestionSchema().getDataSchema(), this.getIngestionSchema().getIOConfig(), ParallelIndexSupervisorTask.convertToIndexTuningConfig(this.getIngestionSchema().getTuningConfig())), this.getContext(), this.authorizerMapper, this.chatHandlerProvider, this.rowIngestionMetersFactory, this.appenderatorsManager);
        if (this.currentSubTaskHolder.setTask(indexTask) && indexTask.isReady(toolbox.getTaskActionClient())) {
            return indexTask.run(toolbox);
        }
        LOG.info("Task is asked to stop. Finish as failed", new Object[0]);
        return TaskStatus.failure((String)this.getId());
    }

    private static IndexTask.IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig) {
        return new IndexTask.IndexTuningConfig(null, null, tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxBytesInMemory(), null, null, null, null, tuningConfig.getPartitionsSpec(), tuningConfig.getIndexSpec(), tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), tuningConfig.isForceGuaranteedRollup(), tuningConfig.isReportParseExceptions(), null, tuningConfig.getPushTimeout(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.isLogParseExceptions(), tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions());
    }

    @POST
    @Path(value="/segment/allocate")
    @Produces(value={"application/x-jackson-smile"})
    @Consumes(value={"application/x-jackson-smile"})
    public Response allocateSegment(DateTime timestamp, @Context HttpServletRequest req) {
        ChatHandlers.authorizationCheck((HttpServletRequest)req, (Action)Action.READ, (String)this.getDataSource(), (AuthorizerMapper)this.authorizerMapper);
        if (this.toolbox == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        try {
            SegmentIdWithShardSpec segmentIdentifier = this.allocateNewSegment(timestamp);
            return Response.ok((Object)this.toolbox.getJsonMapper().writeValueAsBytes((Object)segmentIdentifier)).build();
        }
        catch (IOException | IllegalStateException e) {
            return Response.serverError().entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
        }
        catch (IllegalArgumentException e) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
        }
    }

    @VisibleForTesting
    SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException {
        String version;
        Interval interval;
        String dataSource = this.getDataSource();
        GranularitySpec granularitySpec = this.getIngestionSchema().getDataSchema().getGranularitySpec();
        Optional bucketIntervals = granularitySpec.bucketIntervals();
        List<TaskLock> locks = this.toolbox.getTaskActionClient().submit(new LockListAction());
        TaskLock revokedLock = locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null);
        if (revokedLock != null) {
            throw new ISE("Lock revoked: [%s]", new Object[]{revokedLock});
        }
        Map<Interval, String> versions = locks.stream().collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));
        if (bucketIntervals.isPresent()) {
            Optional maybeInterval = granularitySpec.bucketInterval(timestamp);
            if (!maybeInterval.isPresent()) {
                throw new IAE("Could not find interval for timestamp [%s]", new Object[]{timestamp});
            }
            interval = (Interval)maybeInterval.get();
            if (!((SortedSet)bucketIntervals.get()).contains(interval)) {
                throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", new Object[]{interval, granularitySpec});
            }
            version = ParallelIndexSupervisorTask.findVersion(versions, interval);
            if (version == null) {
                throw new ISE("Cannot find a version for interval[%s]", new Object[]{interval});
            }
        } else {
            interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
            version = ParallelIndexSupervisorTask.findVersion(versions, interval);
            if (version == null) {
                TaskLock lock = (TaskLock)Preconditions.checkNotNull((Object)this.toolbox.getTaskActionClient().submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)), (String)"Cannot acquire a lock for interval[%s]", (Object[])new Object[]{interval});
                version = lock.getVersion();
            }
        }
        int partitionNum = Counters.getAndIncrementInt(this.partitionNumCountersPerInterval, interval);
        return new SegmentIdWithShardSpec(dataSource, interval, version, (ShardSpec)new NumberedShardSpec(partitionNum, 0));
    }

    @Nullable
    private static String findVersion(Map<Interval, String> versions, Interval interval) {
        return versions.entrySet().stream().filter(entry -> ((Interval)entry.getKey()).contains((ReadableInterval)interval)).map(Map.Entry::getValue).findFirst().orElse(null);
    }

    static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema) {
        InputRowParser parser = ingestionSchema.getDataSchema().getParser();
        return ingestionSchema.getIOConfig().getNonNullInputFormat(parser == null ? null : parser.getParseSpec());
    }

    @POST
    @Path(value="/report")
    @Consumes(value={"application/x-jackson-smile"})
    public Response report(SubTaskReport report, @Context HttpServletRequest req) {
        ChatHandlers.authorizationCheck((HttpServletRequest)req, (Action)Action.WRITE, (String)this.getDataSource(), (AuthorizerMapper)this.authorizerMapper);
        if (this.currentSubTaskHolder == null || this.currentSubTaskHolder.getTask() == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner)this.currentSubTaskHolder.getTask();
        runner.collectReport(report);
        return Response.ok().build();
    }

    @GET
    @Path(value="/mode")
    @Produces(value={"application/json"})
    public Response getMode(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        return Response.ok((Object)(this.isParallelMode() ? "parallel" : "sequential")).build();
    }

    @GET
    @Path(value="/phase")
    @Produces(value={"application/json"})
    public Response getPhaseName(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        if (this.isParallelMode()) {
            ParallelIndexTaskRunner runner = this.getCurrentRunner();
            if (runner == null) {
                return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running").build();
            }
            return Response.ok((Object)runner.getName()).build();
        }
        return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"task is running in the sequential mode").build();
    }

    @GET
    @Path(value="/progress")
    @Produces(value={"application/json"})
    public Response getProgress(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok((Object)currentRunner.getProgress()).build();
    }

    @GET
    @Path(value="/subtasks/running")
    @Produces(value={"application/json"})
    public Response getRunningTasks(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getRunningTaskIds()).build();
    }

    @GET
    @Path(value="/subtaskspecs")
    @Produces(value={"application/json"})
    public Response getSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspecs/running")
    @Produces(value={"application/json"})
    public Response getRunningSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getRunningSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspecs/complete")
    @Produces(value={"application/json"})
    public Response getCompleteSubTaskSpecs(@Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        return Response.ok(currentRunner.getCompleteSubTaskSpecs()).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}")
    @Produces(value={"application/json"})
    public Response getSubTaskSpec(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        SubTaskSpec subTaskSpec = currentRunner.getSubTaskSpec(id);
        if (subTaskSpec == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok(subTaskSpec).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}/state")
    @Produces(value={"application/json"})
    public Response getSubTaskState(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        ParallelIndexTaskRunner.SubTaskSpecStatus subTaskSpecStatus = currentRunner.getSubTaskState(id);
        if (subTaskSpecStatus == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok((Object)subTaskSpecStatus).build();
    }

    @GET
    @Path(value="/subtaskspec/{id}/history")
    @Produces(value={"application/json"})
    public Response getCompleteSubTaskSpecAttemptHistory(@PathParam(value="id") String id, @Context HttpServletRequest req) {
        IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, this.getDataSource(), this.authorizerMapper);
        ParallelIndexTaskRunner currentRunner = this.getCurrentRunner();
        if (currentRunner == null) {
            return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).entity((Object)"task is not running yet").build();
        }
        TaskHistory taskHistory = currentRunner.getCompleteSubTaskSpecAttemptHistory(id);
        if (taskHistory == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        return Response.ok(taskHistory.getAttemptHistory()).build();
    }

    private static /* synthetic */ void lambda$determineAllRangePartitions$5(Multimap intervalToDistributions, DimensionDistributionReport report) {
        Map<Interval, StringDistribution> intervalToDistribution = report.getIntervalToDistribution();
        intervalToDistribution.forEach((arg_0, arg_1) -> ((Multimap)intervalToDistributions).put(arg_0, arg_1));
    }
}

