/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.PartitionLocation;
import org.apache.druid.indexing.common.task.batch.parallel.PerfectRollupWorkerTask;
import org.apache.druid.indexing.common.task.batch.parallel.PushedSegmentsReport;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;

abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionLocation>
extends PerfectRollupWorkerTask {
    private static final Logger LOG = new Logger(PartialSegmentMergeTask.class);
    private static final int BUFFER_SIZE = 4096;
    private static final int NUM_FETCH_RETRIES = 3;
    private final PartialSegmentMergeIOConfig<P> ioConfig;
    private final int numAttempts;
    private final String supervisorTaskId;
    private final IndexingServiceClient indexingServiceClient;
    private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory;
    private final HttpClient shuffleClient;
    private final byte[] buffer;

    PartialSegmentMergeTask(@Nullable String id, String groupId, TaskResource taskResource, String supervisorTaskId, DataSchema dataSchema, PartialSegmentMergeIOConfig<P> ioConfig, ParallelIndexTuningConfig tuningConfig, int numAttempts, Map<String, Object> context, IndexingServiceClient indexingServiceClient, IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory, HttpClient shuffleClient) {
        super(id, groupId, taskResource, dataSchema, tuningConfig, context);
        this.ioConfig = ioConfig;
        this.numAttempts = numAttempts;
        this.supervisorTaskId = supervisorTaskId;
        this.indexingServiceClient = indexingServiceClient;
        this.taskClientFactory = taskClientFactory;
        this.shuffleClient = shuffleClient;
        this.buffer = new byte[4096];
    }

    @JsonProperty
    public int getNumAttempts() {
        return this.numAttempts;
    }

    @JsonProperty
    public String getSupervisorTaskId() {
        return this.supervisorTaskId;
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @Override
    public TaskStatus runTask(TaskToolbox toolbox) throws Exception {
        HashMap<Interval, Int2ObjectMap<List<P>>> intervalToPartitions = new HashMap<Interval, Int2ObjectMap<List<P>>>();
        for (PartitionLocation location : this.ioConfig.getPartitionLocations()) {
            ((List)intervalToPartitions.computeIfAbsent(location.getInterval(), k -> new Int2ObjectOpenHashMap()).computeIfAbsent(location.getPartitionId(), k -> new ArrayList())).add(location);
        }
        List locks = (List)toolbox.getTaskActionClient().submit(new SurrogateAction(this.supervisorTaskId, new LockListAction()));
        HashMap<Interval, String> intervalToVersion = new HashMap<Interval, String>(locks.size());
        locks.forEach(lock -> {
            if (lock.isRevoked()) {
                throw new ISE("Lock[%s] is revoked", new Object[]{lock});
            }
            String mustBeNull = intervalToVersion.put(lock.getInterval(), lock.getVersion());
            if (mustBeNull != null) {
                throw new ISE("WTH? Two versions([%s], [%s]) for the same interval[%s]?", new Object[]{lock.getVersion(), mustBeNull, lock.getInterval()});
            }
        });
        Stopwatch fetchStopwatch = Stopwatch.createStarted();
        Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = this.fetchSegmentFiles(toolbox, intervalToPartitions);
        long fetchTime = fetchStopwatch.elapsed(TimeUnit.SECONDS);
        fetchStopwatch.stop();
        LOG.info("Fetch took [%s] seconds", new Object[]{fetchTime});
        ParallelIndexSupervisorTaskClient taskClient = this.taskClientFactory.build(new ClientBasedTaskInfoProvider(this.indexingServiceClient), this.getId(), 1, this.getTuningConfig().getChatHandlerTimeout(), this.getTuningConfig().getChatHandlerNumRetries());
        File persistDir = toolbox.getPersistDir();
        org.apache.commons.io.FileUtils.deleteQuietly((File)persistDir);
        org.apache.commons.io.FileUtils.forceMkdir((File)persistDir);
        Set<DataSegment> pushedSegments = this.mergeAndPushSegments(toolbox, this.getDataSchema(), this.getTuningConfig(), persistDir, intervalToVersion, intervalToUnzippedFiles);
        taskClient.report(this.supervisorTaskId, new PushedSegmentsReport(this.getId(), Collections.emptySet(), pushedSegments));
        return TaskStatus.success((String)this.getId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles(TaskToolbox toolbox, Map<Interval, Int2ObjectMap<List<P>>> intervalToPartitions) throws IOException {
        File tempDir = toolbox.getIndexingTmpDir();
        org.apache.commons.io.FileUtils.deleteQuietly((File)tempDir);
        org.apache.commons.io.FileUtils.forceMkdir((File)tempDir);
        HashMap<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = new HashMap<Interval, Int2ObjectMap<List<File>>>();
        for (Map.Entry<Interval, Int2ObjectMap<List<P>>> entryPerInterval : intervalToPartitions.entrySet()) {
            Interval interval = entryPerInterval.getKey();
            for (Int2ObjectMap.Entry entryPerPartitionId : entryPerInterval.getValue().int2ObjectEntrySet()) {
                int partitionId = entryPerPartitionId.getIntKey();
                File partitionDir = org.apache.commons.io.FileUtils.getFile((File)tempDir, (String[])new String[]{interval.getStart().toString(), interval.getEnd().toString(), Integer.toString(partitionId)});
                org.apache.commons.io.FileUtils.forceMkdir((File)partitionDir);
                for (PartitionLocation location : (List)entryPerPartitionId.getValue()) {
                    File zippedFile = this.fetchSegmentFile(partitionDir, location);
                    try {
                        File unzippedDir = new File(partitionDir, StringUtils.format((String)"unzipped_%s", (Object[])new Object[]{location.getSubTaskId()}));
                        org.apache.commons.io.FileUtils.forceMkdir((File)unzippedDir);
                        CompressionUtils.unzip((File)zippedFile, (File)unzippedDir);
                        ((List)intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap()).computeIfAbsent(partitionId, k -> new ArrayList())).add(unzippedDir);
                        if (zippedFile.delete()) continue;
                    }
                    catch (Throwable throwable) {
                        if (!zippedFile.delete()) {
                            LOG.warn("Failed to delete temp file[%s]", new Object[]{zippedFile});
                        }
                        throw throwable;
                    }
                    LOG.warn("Failed to delete temp file[%s]", new Object[]{zippedFile});
                }
            }
        }
        return intervalToUnzippedFiles;
    }

    @VisibleForTesting
    File fetchSegmentFile(File partitionDir, P location) throws IOException {
        File zippedFile = new File(partitionDir, StringUtils.format((String)"temp_%s", (Object[])new Object[]{((PartitionLocation)location).getSubTaskId()}));
        URI uri = ((PartitionLocation)location).toIntermediaryDataServerURI(this.supervisorTaskId);
        FileUtils.copyLarge((Object)uri, u -> {
            try {
                return (InputStream)this.shuffleClient.go(new Request(HttpMethod.GET, u.toURL()), (HttpResponseHandler)new InputStreamResponseHandler()).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, (File)zippedFile, (byte[])this.buffer, t -> t instanceof IOException, (int)3, (String)StringUtils.format((String)"Failed to fetch file[%s]", (Object[])new Object[]{uri}));
        return zippedFile;
    }

    abstract S createShardSpec(TaskToolbox var1, Interval var2, int var3);

    private Set<DataSegment> mergeAndPushSegments(TaskToolbox toolbox, DataSchema dataSchema, ParallelIndexTuningConfig tuningConfig, File persistDir, Map<Interval, String> intervalToVersion, Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles) throws Exception {
        DataSegmentPusher segmentPusher = toolbox.getSegmentPusher();
        HashSet<DataSegment> pushedSegments = new HashSet<DataSegment>();
        for (Map.Entry<Interval, Int2ObjectMap<List<File>>> entryPerInterval : intervalToUnzippedFiles.entrySet()) {
            Interval interval = entryPerInterval.getKey();
            for (Int2ObjectMap.Entry entryPerPartitionId : entryPerInterval.getValue().int2ObjectEntrySet()) {
                int partitionId = entryPerPartitionId.getIntKey();
                List segmentFilesToMerge = (List)entryPerPartitionId.getValue();
                Pair<File, List<String>> mergedFileAndDimensionNames = PartialSegmentMergeTask.mergeSegmentsInSamePartition(dataSchema, tuningConfig, toolbox.getIndexIO(), toolbox.getIndexMergerV9(), segmentFilesToMerge, tuningConfig.getMaxNumSegmentsToMerge(), persistDir, 0);
                List metricNames = Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList());
                DataSegment segment = (DataSegment)RetryUtils.retry(() -> segmentPusher.push((File)mergedFileAndDimensionNames.lhs, new DataSegment(this.getDataSource(), interval, (String)Preconditions.checkNotNull(intervalToVersion.get(interval), (String)"version for interval[%s]", (Object[])new Object[]{interval}), null, (List)mergedFileAndDimensionNames.rhs, metricNames, this.createShardSpec(toolbox, interval, partitionId), null, 0L), false), exception -> exception instanceof Exception, (int)5);
                pushedSegments.add(segment);
            }
        }
        return pushedSegments;
    }

    private static Pair<File, List<String>> mergeSegmentsInSamePartition(DataSchema dataSchema, ParallelIndexTuningConfig tuningConfig, IndexIO indexIO, IndexMergerV9 merger, List<File> indexes, int maxNumSegmentsToMerge, File baseOutDir, int outDirSuffix) throws IOException {
        int suffix = outDirSuffix;
        ArrayList<File> mergedFiles = new ArrayList<File>();
        List dimensionNames = null;
        for (int i = 0; i < indexes.size(); i += maxNumSegmentsToMerge) {
            List<File> filesToMerge = indexes.subList(i, Math.min(i + maxNumSegmentsToMerge, indexes.size()));
            ArrayList<QueryableIndex> indexesToMerge = new ArrayList<QueryableIndex>(filesToMerge.size());
            Closer indexCleaner = Closer.create();
            for (File file : filesToMerge) {
                QueryableIndex queryableIndex = indexIO.loadIndex(file);
                indexesToMerge.add(queryableIndex);
                indexCleaner.register(() -> {
                    queryableIndex.close();
                    file.delete();
                });
            }
            if (maxNumSegmentsToMerge >= indexes.size()) {
                dimensionNames = IndexMerger.getMergedDimensionsFromQueryableIndexes(indexesToMerge);
            }
            File outDir = new File(baseOutDir, StringUtils.format((String)"merged_%d", (Object[])new Object[]{suffix++}));
            mergedFiles.add(merger.mergeQueryableIndex(indexesToMerge, dataSchema.getGranularitySpec().isRollup(), dataSchema.getAggregators(), outDir, tuningConfig.getIndexSpec(), tuningConfig.getSegmentWriteOutMediumFactory()));
            indexCleaner.close();
        }
        if (mergedFiles.size() == 1) {
            return Pair.of(mergedFiles.get(0), (Object)Preconditions.checkNotNull(dimensionNames, (Object)"dimensionNames"));
        }
        return PartialSegmentMergeTask.mergeSegmentsInSamePartition(dataSchema, tuningConfig, indexIO, merger, mergedFiles, maxNumSegmentsToMerge, baseOutDir, suffix);
    }
}

