/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.AbstractStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalDeltaStartingScanner
extends AbstractStartingScanner {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalDeltaStartingScanner.class);
    private final long endingSnapshotId;
    private final ScanMode scanMode;

    public IncrementalDeltaStartingScanner(SnapshotManager snapshotManager, long start, long end, ScanMode scanMode) {
        super(snapshotManager);
        this.startingSnapshotId = start;
        this.endingSnapshotId = end;
        this.scanMode = scanMode;
    }

    @Override
    public StartingScanner.Result scan(SnapshotReader reader) {
        ConcurrentHashMap<Pair, List> grouped = new ConcurrentHashMap<Pair, List>();
        ManifestsReader manifestsReader = reader.manifestsReader();
        List snapshots2 = LongStream.range(this.startingSnapshotId + 1L, this.endingSnapshotId + 1L).boxed().collect(Collectors.toList());
        Iterator manifests = ManifestReadThreadPool.randomlyExecuteSequentialReturn(id -> {
            Snapshot snapshot = this.snapshotManager.snapshot((long)id);
            switch (this.scanMode) {
                case DELTA: {
                    if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) break;
                    return Collections.emptyList();
                }
                case CHANGELOG: {
                    if (snapshot.commitKind() != Snapshot.CommitKind.OVERWRITE) break;
                    return Collections.emptyList();
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported scan mode: " + (Object)((Object)this.scanMode));
                }
            }
            return manifestsReader.read((Snapshot)snapshot, (ScanMode)this.scanMode).filteredManifests;
        }, snapshots2, reader.parallelism());
        Iterator entries = ManifestReadThreadPool.randomlyExecuteSequentialReturn(reader::readManifest, Lists.newArrayList(manifests), reader.parallelism());
        while (entries.hasNext()) {
            ManifestEntry entry = (ManifestEntry)entries.next();
            Preconditions.checkArgument((entry.kind() == FileKind.ADD ? 1 : 0) != 0, (Object)"Delta or changelog should only have ADD files.");
            grouped.computeIfAbsent(Pair.of((Object)entry.partition(), (Object)entry.bucket()), ignore -> new ArrayList()).add(entry);
        }
        ArrayList<Split> result = new ArrayList<Split>();
        for (Map.Entry entry : grouped.entrySet()) {
            BinaryRow partition = (BinaryRow)((Pair)entry.getKey()).getLeft();
            int bucket = (Integer)((Pair)entry.getKey()).getRight();
            String bucketPath = reader.pathFactory().bucketPath(partition, bucket).toString();
            for (SplitGenerator.SplitGroup splitGroup : reader.splitGenerator().splitForBatch(((List)entry.getValue()).stream().map(ManifestEntry::file).collect(Collectors.toList()))) {
                DataSplit.Builder dataSplitBuilder = DataSplit.builder().isStreaming(true).withSnapshot(this.endingSnapshotId).withPartition(partition).withBucket(bucket).withTotalBuckets(((ManifestEntry)((List)entry.getValue()).get(0)).totalBuckets()).withDataFiles(splitGroup.files).rawConvertible(splitGroup.rawConvertible).withBucketPath(bucketPath);
                result.add(dataSplitBuilder.build());
            }
        }
        return StartingScanner.fromPlan(new PlanImpl(null, this.endingSnapshotId, result));
    }

    public static StartingScanner betweenSnapshotIds(long startId, long endId, SnapshotManager snapshotManager, ScanMode scanMode) {
        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
        long latestSnapshotId = snapshotManager.latestSnapshotId();
        Preconditions.checkArgument((startId >= earliestSnapshotId - 1L && endId <= latestSnapshotId ? 1 : 0) != 0, (String)"The specified scan snapshotId range [%s, %s] is out of available snapshotId range [%s, %s].", (Object[])new Object[]{startId, endId, earliestSnapshotId, latestSnapshotId});
        return new IncrementalDeltaStartingScanner(snapshotManager, startId, endId, scanMode);
    }

    public static IncrementalDeltaStartingScanner betweenTimestamps(long startTimestamp, long endTimestamp, SnapshotManager snapshotManager, ScanMode scanMode) {
        Snapshot startingSnapshot = snapshotManager.earlierOrEqualTimeMills(startTimestamp);
        Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
        long startId = startingSnapshot == null || earliestSnapshot.timeMillis() > startTimestamp ? earliestSnapshot.id() - 1L : startingSnapshot.id();
        Snapshot endSnapshot = snapshotManager.earlierOrEqualTimeMills(endTimestamp);
        long endId = endSnapshot == null ? snapshotManager.latestSnapshot().id() : endSnapshot.id();
        return new IncrementalDeltaStartingScanner(snapshotManager, startId, endId, scanMode);
    }
}

