/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;

public class SnapshotReaderImpl
implements SnapshotReader {
    private final FileStoreScan scan;
    private final TableSchema tableSchema;
    private final CoreOptions options;
    private final SnapshotManager snapshotManager;
    private final ConsumerManager consumerManager;
    private final SplitGenerator splitGenerator;
    private final BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer;
    private final DefaultValueAssigner defaultValueAssigner;
    private ScanKind scanKind = ScanKind.ALL;
    private RecordComparator lazyPartitionComparator;

    public SnapshotReaderImpl(FileStoreScan scan, TableSchema tableSchema, CoreOptions options, SnapshotManager snapshotManager, SplitGenerator splitGenerator, BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer, DefaultValueAssigner defaultValueAssigner) {
        this.scan = scan;
        this.tableSchema = tableSchema;
        this.options = options;
        this.snapshotManager = snapshotManager;
        this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
        this.splitGenerator = splitGenerator;
        this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
        this.defaultValueAssigner = defaultValueAssigner;
    }

    @Override
    public SnapshotManager snapshotManager() {
        return this.snapshotManager;
    }

    @Override
    public ConsumerManager consumerManager() {
        return this.consumerManager;
    }

    @Override
    public SplitGenerator splitGenerator() {
        return this.splitGenerator;
    }

    @Override
    public SnapshotReader withSnapshot(long snapshotId) {
        this.scan.withSnapshot(snapshotId);
        return this;
    }

    @Override
    public SnapshotReader withSnapshot(Snapshot snapshot) {
        this.scan.withSnapshot(snapshot);
        return this;
    }

    @Override
    public SnapshotReader withFilter(Predicate predicate) {
        List<String> partitionKeys = this.tableSchema.partitionKeys();
        int[] fieldIdxToPartitionIdx = this.tableSchema.fields().stream().mapToInt(f -> partitionKeys.indexOf(f.name())).toArray();
        ArrayList partitionFilters = new ArrayList();
        ArrayList<Predicate> nonPartitionFilters = new ArrayList<Predicate>();
        for (Predicate p : PredicateBuilder.splitAnd((Predicate)this.defaultValueAssigner.handlePredicate(predicate))) {
            Optional mapped = PredicateBuilder.transformFieldMapping((Predicate)p, (int[])fieldIdxToPartitionIdx);
            if (mapped.isPresent()) {
                partitionFilters.add(mapped.get());
                continue;
            }
            nonPartitionFilters.add(p);
        }
        if (partitionFilters.size() > 0) {
            this.scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
        }
        if (nonPartitionFilters.size() > 0) {
            this.nonPartitionFilterConsumer.accept(this.scan, PredicateBuilder.and(nonPartitionFilters));
        }
        return this;
    }

    @Override
    public SnapshotReader withKind(ScanKind scanKind) {
        this.scanKind = scanKind;
        this.scan.withKind(scanKind);
        return this;
    }

    @Override
    public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
        this.scan.withLevelFilter(levelFilter);
        return this;
    }

    @Override
    public SnapshotReader withBucket(int bucket) {
        this.scan.withBucket(bucket);
        return this;
    }

    @Override
    public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
        this.scan.withBucketFilter(bucketFilter);
        return this;
    }

    @Override
    public SnapshotReader.Plan read() {
        final FileStoreScan.Plan plan = this.scan.plan();
        Long snapshotId = plan.snapshotId();
        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
        if (this.options.scanPlanSortPartition()) {
            LinkedHashMap<BinaryRow, Map<Integer, List<DataFileMeta>>> newFiles = new LinkedHashMap<BinaryRow, Map<Integer, List<DataFileMeta>>>();
            files.entrySet().stream().sorted((o1, o2) -> this.partitionComparator().compare((InternalRow)o1.getKey(), (InternalRow)o2.getKey())).forEach(entry -> {
                Map cfr_ignored_0 = (Map)newFiles.put((BinaryRow)entry.getKey(), (Map<Integer, List<DataFileMeta>>)entry.getValue());
            });
            files = newFiles;
        }
        final List<DataSplit> splits = SnapshotReaderImpl.generateSplits(snapshotId == null ? 0L : snapshotId, this.scanKind != ScanKind.ALL, this.splitGenerator, files);
        return new SnapshotReader.Plan(){

            @Override
            @Nullable
            public Long watermark() {
                return plan.watermark();
            }

            @Override
            @Nullable
            public Long snapshotId() {
                return plan.snapshotId();
            }

            @Override
            public List<Split> splits() {
                return splits;
            }
        };
    }

    @Override
    public List<BinaryRow> partitions() {
        List<ManifestEntry> entryList = this.scan.plan().files();
        return entryList.stream().collect(Collectors.groupingBy(ManifestEntry::partition, LinkedHashMap::new, Collectors.reducing((a, b) -> b))).values().stream().map(Optional::get).map(ManifestEntry::partition).collect(Collectors.toList());
    }

    @Override
    public SnapshotReader.Plan readOverwrittenChanges() {
        this.withKind(ScanKind.DELTA);
        FileStoreScan.Plan plan = this.scan.plan();
        long snapshotId = plan.snapshotId();
        Snapshot snapshot = this.snapshotManager.snapshot(snapshotId);
        if (snapshot.commitKind() != Snapshot.CommitKind.OVERWRITE) {
            throw new IllegalStateException("Cannot read overwrite splits from a non-overwrite snapshot.");
        }
        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.DELETE));
        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
        return this.toChangesPlan(true, plan, beforeFiles, dataFiles);
    }

    private SnapshotReader.Plan toChangesPlan(boolean isStreaming, final FileStoreScan.Plan plan, Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles, Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
        final ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        HashMap buckets = new HashMap();
        beforeFiles.forEach((part, bucketMap) -> buckets.computeIfAbsent(part, k -> new HashSet()).addAll(bucketMap.keySet()));
        dataFiles.forEach((part, bucketMap) -> buckets.computeIfAbsent(part, k -> new HashSet()).addAll(bucketMap.keySet()));
        for (Map.Entry entry : buckets.entrySet()) {
            BinaryRow part2 = (BinaryRow)entry.getKey();
            for (Integer bucket : (Set)entry.getValue()) {
                List<DataFileMeta> before = beforeFiles.getOrDefault(part2, Collections.emptyMap()).getOrDefault(bucket, Collections.emptyList());
                List<DataFileMeta> data = dataFiles.getOrDefault(part2, Collections.emptyMap()).getOrDefault(bucket, Collections.emptyList());
                before.removeIf(data::remove);
                DataSplit split = DataSplit.builder().withSnapshot(plan.snapshotId()).withPartition(part2).withBucket(bucket).withBeforeFiles(before).withDataFiles(data).isStreaming(isStreaming).build();
                splits.add(split);
            }
        }
        return new SnapshotReader.Plan(){

            @Override
            @Nullable
            public Long watermark() {
                return plan.watermark();
            }

            @Override
            @Nullable
            public Long snapshotId() {
                return plan.snapshotId();
            }

            @Override
            public List<Split> splits() {
                return splits;
            }
        };
    }

    @Override
    public SnapshotReader.Plan readIncrementalDiff(Snapshot before) {
        this.withKind(ScanKind.ALL);
        FileStoreScan.Plan plan = this.scan.plan();
        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles = FileStoreScan.Plan.groupByPartFiles(this.scan.withSnapshot(before).plan().files(FileKind.ADD));
        return this.toChangesPlan(false, plan, beforeFiles, dataFiles);
    }

    private RecordComparator partitionComparator() {
        if (this.lazyPartitionComparator == null) {
            this.lazyPartitionComparator = CodeGenUtils.newRecordComparator(this.tableSchema.logicalPartitionType().getFieldTypes(), "PartitionComparator");
        }
        return this.lazyPartitionComparator;
    }

    @VisibleForTesting
    public static List<DataSplit> generateSplits(long snapshotId, boolean isStreaming, SplitGenerator splitGenerator, Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry : groupedDataFiles.entrySet()) {
            BinaryRow partition = entry.getKey();
            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : buckets.entrySet()) {
                int bucket = bucketEntry.getKey();
                List<DataFileMeta> bucketFiles = bucketEntry.getValue();
                DataSplit.Builder builder = DataSplit.builder().withSnapshot(snapshotId).withPartition(partition).withBucket(bucket).isStreaming(isStreaming);
                List<List<DataFileMeta>> splitGroups = isStreaming ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles);
                splitGroups.stream().map(builder::withDataFiles).map(DataSplit.Builder::build).forEach(splits::add);
            }
        }
        return splits;
    }
}

