/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

public class SnapshotReaderImpl
implements SnapshotReader {
    private final FileStoreScan scan;
    private final TableSchema tableSchema;
    private final CoreOptions options;
    private final boolean deletionVectors;
    private final SnapshotManager snapshotManager;
    private final ChangelogManager changelogManager;
    private final ConsumerManager consumerManager;
    private final SplitGenerator splitGenerator;
    private final BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer;
    private final DefaultValueAssigner defaultValueAssigner;
    private final FileStorePathFactory pathFactory;
    private final String tableName;
    private final IndexFileHandler indexFileHandler;
    private ScanMode scanMode = ScanMode.ALL;
    private RecordComparator lazyPartitionComparator;

    public SnapshotReaderImpl(FileStoreScan scan, TableSchema tableSchema, CoreOptions options, SnapshotManager snapshotManager, ChangelogManager changelogManager, SplitGenerator splitGenerator, BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer, DefaultValueAssigner defaultValueAssigner, FileStorePathFactory pathFactory, String tableName, IndexFileHandler indexFileHandler) {
        this.scan = scan;
        this.tableSchema = tableSchema;
        this.options = options;
        this.deletionVectors = options.deletionVectorsEnabled();
        this.snapshotManager = snapshotManager;
        this.changelogManager = changelogManager;
        this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath(), snapshotManager.branch());
        this.splitGenerator = splitGenerator;
        this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
        this.defaultValueAssigner = defaultValueAssigner;
        this.pathFactory = pathFactory;
        this.tableName = tableName;
        this.indexFileHandler = indexFileHandler;
    }

    @Override
    public Integer parallelism() {
        return this.scan.parallelism();
    }

    @Override
    public SnapshotManager snapshotManager() {
        return this.snapshotManager;
    }

    @Override
    public ChangelogManager changelogManager() {
        return this.changelogManager;
    }

    @Override
    public ManifestsReader manifestsReader() {
        return this.scan.manifestsReader();
    }

    @Override
    public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
        return this.scan.readManifest(manifest);
    }

    @Override
    public ConsumerManager consumerManager() {
        return this.consumerManager;
    }

    @Override
    public SplitGenerator splitGenerator() {
        return this.splitGenerator;
    }

    @Override
    public FileStorePathFactory pathFactory() {
        return this.pathFactory;
    }

    @Override
    public SnapshotReader withSnapshot(long snapshotId) {
        this.scan.withSnapshot(snapshotId);
        return this;
    }

    @Override
    public SnapshotReader withSnapshot(Snapshot snapshot) {
        this.scan.withSnapshot(snapshot);
        return this;
    }

    @Override
    public SnapshotReader withPartitionFilter(Map<String, String> partitionSpec) {
        if (partitionSpec != null) {
            Predicate partitionPredicate = PartitionPredicate.createPartitionPredicate(partitionSpec, this.tableSchema.logicalPartitionType(), this.options.partitionDefaultName());
            this.scan.withPartitionFilter(partitionPredicate);
        }
        return this;
    }

    @Override
    public SnapshotReader withPartitionFilter(Predicate predicate) {
        this.scan.withPartitionFilter(predicate);
        return this;
    }

    @Override
    public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
        this.scan.withPartitionFilter(partitions);
        return this;
    }

    @Override
    public SnapshotReader withPartitionsFilter(List<Map<String, String>> partitions) {
        this.scan.withPartitionsFilter(partitions);
        return this;
    }

    @Override
    public SnapshotReader withFilter(Predicate predicate) {
        List<String> partitionKeys = this.tableSchema.partitionKeys();
        int[] fieldIdxToPartitionIdx = this.tableSchema.fields().stream().mapToInt(f -> partitionKeys.indexOf(f.name())).toArray();
        ArrayList partitionFilters = new ArrayList();
        ArrayList<Predicate> nonPartitionFilters = new ArrayList<Predicate>();
        for (Predicate p : PredicateBuilder.splitAnd((Predicate)this.defaultValueAssigner.handlePredicate(predicate))) {
            Optional mapped = PredicateBuilder.transformFieldMapping((Predicate)p, (int[])fieldIdxToPartitionIdx);
            if (mapped.isPresent()) {
                partitionFilters.add(mapped.get());
                continue;
            }
            nonPartitionFilters.add(p);
        }
        if (partitionFilters.size() > 0) {
            this.scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
        }
        if (nonPartitionFilters.size() > 0) {
            this.nonPartitionFilterConsumer.accept(this.scan, PredicateBuilder.and(nonPartitionFilters));
        }
        return this;
    }

    @Override
    public SnapshotReader withMode(ScanMode scanMode) {
        this.scanMode = scanMode;
        this.scan.withKind(scanMode);
        return this;
    }

    @Override
    public SnapshotReader withLevel(int level) {
        this.scan.withLevel(level);
        return this;
    }

    @Override
    public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
        this.scan.withLevelFilter(levelFilter);
        return this;
    }

    @Override
    public SnapshotReader enableValueFilter() {
        this.scan.enableValueFilter();
        return this;
    }

    @Override
    public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter) {
        this.scan.withManifestEntryFilter(filter);
        return this;
    }

    @Override
    public SnapshotReader withBucket(int bucket) {
        this.scan.withBucket(bucket);
        return this;
    }

    @Override
    public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
        this.scan.withBucketFilter(bucketFilter);
        return this;
    }

    @Override
    public SnapshotReader withMetricRegistry(MetricRegistry registry) {
        this.scan.withMetrics(new ScanMetrics(registry, this.tableName));
        return this;
    }

    @Override
    public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
        this.scan.withDataFileNameFilter(fileNameFilter);
        return this;
    }

    @Override
    public SnapshotReader dropStats() {
        this.scan.dropStats();
        return this;
    }

    @Override
    public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
        if (this.splitGenerator.alwaysRawConvertible()) {
            this.withDataFileNameFilter((Filter<String>)((Filter)file -> Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask));
        } else {
            this.withBucketFilter((Filter<Integer>)((Filter)bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask));
        }
        return this;
    }

    @Override
    public SnapshotReader.Plan read() {
        FileStoreScan.Plan plan = this.scan.plan();
        Snapshot snapshot = plan.snapshot();
        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> grouped = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
        if (this.options.scanPlanSortPartition()) {
            LinkedHashMap<BinaryRow, Map<Integer, List<ManifestEntry>>> sorted2 = new LinkedHashMap<BinaryRow, Map<Integer, List<ManifestEntry>>>();
            grouped.entrySet().stream().sorted((o1, o2) -> this.partitionComparator().compare((InternalRow)o1.getKey(), (InternalRow)o2.getKey())).forEach(entry -> {
                Map cfr_ignored_0 = (Map)sorted2.put((BinaryRow)entry.getKey(), (Map<Integer, List<ManifestEntry>>)entry.getValue());
            });
            grouped = sorted2;
        }
        List<Split> splits = this.generateSplits(snapshot, this.scanMode != ScanMode.ALL, this.splitGenerator, grouped);
        return new PlanImpl(plan.watermark(), snapshot == null ? null : Long.valueOf(snapshot.id()), splits);
    }

    private List<DataSplit> generateSplits(@Nullable Snapshot snapshot, boolean isStreaming, SplitGenerator splitGenerator, Map<BinaryRow, Map<Integer, List<ManifestEntry>>> groupedManifestEntries) {
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        Map deletionIndexFilesMap = null;
        if (!isStreaming) {
            deletionIndexFilesMap = this.deletionVectors && snapshot != null ? this.indexFileHandler.scan(snapshot, "DELETION_VECTORS", groupedManifestEntries.keySet()) : Collections.emptyMap();
        }
        for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry : groupedManifestEntries.entrySet()) {
            BinaryRow partition = entry.getKey();
            Map<Integer, List<ManifestEntry>> buckets = entry.getValue();
            for (Map.Entry<Integer, List<ManifestEntry>> bucketEntry : buckets.entrySet()) {
                int bucket = bucketEntry.getKey();
                List<DataFileMeta> bucketFiles = bucketEntry.getValue().stream().map(ManifestEntry::file).collect(Collectors.toList());
                DataSplit.Builder builder = DataSplit.builder().withSnapshot(snapshot == null ? 0L : snapshot.id()).withPartition(partition).withBucket(bucket).withTotalBuckets(bucketEntry.getValue().get(0).totalBuckets()).isStreaming(isStreaming);
                List<SplitGenerator.SplitGroup> splitGroups = isStreaming ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles);
                for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
                    List<DataFileMeta> dataFiles = splitGroup.files;
                    String bucketPath = this.pathFactory.bucketPath(partition, bucket).toString();
                    builder.withDataFiles(dataFiles).rawConvertible(splitGroup.rawConvertible).withBucketPath(bucketPath);
                    if (this.deletionVectors && deletionIndexFilesMap != null) {
                        builder.withDataDeletionFiles(this.getDeletionFiles(dataFiles, deletionIndexFilesMap.getOrDefault(Pair.of((Object)partition, (Object)bucket), Collections.emptyList())));
                    }
                    splits.add(builder.build());
                }
            }
        }
        return splits;
    }

    @Override
    public List<BinaryRow> partitions() {
        return this.scan.listPartitions();
    }

    @Override
    public List<PartitionEntry> partitionEntries() {
        return this.scan.readPartitionEntries();
    }

    @Override
    public List<BucketEntry> bucketEntries() {
        return this.scan.readBucketEntries();
    }

    @Override
    public Iterator<ManifestEntry> readFileIterator() {
        return this.scan.readFileIterator();
    }

    @Override
    public SnapshotReader.Plan readChanges() {
        this.withMode(ScanMode.DELTA);
        FileStoreScan.Plan plan = this.scan.plan();
        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.DELETE));
        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
        Snapshot beforeSnapshot = this.snapshotManager.snapshot(plan.snapshot().id() - 1L);
        return this.toChangesPlan(true, plan, beforeSnapshot, beforeFiles, dataFiles);
    }

    private SnapshotReader.Plan toChangesPlan(boolean isStreaming, FileStoreScan.Plan plan, Snapshot beforeSnapshot, Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles, Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles) {
        Snapshot snapshot = plan.snapshot();
        ArrayList<Split> splits = new ArrayList<Split>();
        HashMap buckets = new HashMap();
        beforeFiles.forEach((part, bucketMap) -> buckets.computeIfAbsent(part, k -> new HashSet()).addAll(bucketMap.keySet()));
        dataFiles.forEach((part, bucketMap) -> buckets.computeIfAbsent(part, k -> new HashSet()).addAll(bucketMap.keySet()));
        Map beforDeletionIndexFilesMap = null;
        Map deletionIndexFilesMap = null;
        if (!isStreaming) {
            beforDeletionIndexFilesMap = this.deletionVectors ? this.indexFileHandler.scan(beforeSnapshot, "DELETION_VECTORS", beforeFiles.keySet()) : Collections.emptyMap();
            deletionIndexFilesMap = this.deletionVectors ? this.indexFileHandler.scan(snapshot, "DELETION_VECTORS", dataFiles.keySet()) : Collections.emptyMap();
        }
        for (Map.Entry entry : buckets.entrySet()) {
            BinaryRow part2 = (BinaryRow)entry.getKey();
            for (Integer bucket : (Set)entry.getValue()) {
                List beforeEntries = beforeFiles.getOrDefault(part2, Collections.emptyMap()).getOrDefault(bucket, Collections.emptyList());
                List dataEntries = dataFiles.getOrDefault(part2, Collections.emptyMap()).getOrDefault(bucket, Collections.emptyList());
                beforeEntries.removeIf(dataEntries::remove);
                Integer totalBuckets = null;
                if (!dataEntries.isEmpty()) {
                    totalBuckets = ((ManifestEntry)dataEntries.get(0)).totalBuckets();
                } else if (!beforeEntries.isEmpty()) {
                    totalBuckets = ((ManifestEntry)beforeEntries.get(0)).totalBuckets();
                }
                List<DataFileMeta> before = beforeEntries.stream().map(ManifestEntry::file).collect(Collectors.toList());
                List<DataFileMeta> data = dataEntries.stream().map(ManifestEntry::file).collect(Collectors.toList());
                DataSplit.Builder builder = DataSplit.builder().withSnapshot(snapshot.id()).withPartition(part2).withBucket(bucket).withTotalBuckets(totalBuckets).withBeforeFiles(before).withDataFiles(data).isStreaming(isStreaming).withBucketPath(this.pathFactory.bucketPath(part2, bucket).toString());
                if (this.deletionVectors && beforDeletionIndexFilesMap != null && deletionIndexFilesMap != null) {
                    builder.withBeforeDeletionFiles(this.getDeletionFiles(before, beforDeletionIndexFilesMap.getOrDefault(Pair.of((Object)part2, (Object)bucket), Collections.emptyList())));
                    builder.withDataDeletionFiles(this.getDeletionFiles(data, deletionIndexFilesMap.getOrDefault(Pair.of((Object)part2, (Object)bucket), Collections.emptyList())));
                }
                splits.add(builder.build());
            }
        }
        return new PlanImpl(plan.watermark(), snapshot == null ? null : Long.valueOf(snapshot.id()), splits);
    }

    @Override
    public SnapshotReader.Plan readIncrementalDiff(Snapshot before) {
        this.withMode(ScanMode.ALL);
        FileStoreScan.Plan plan = this.scan.plan();
        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles = FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles = FileStoreScan.Plan.groupByPartFiles(this.scan.withSnapshot(before).plan().files(FileKind.ADD));
        return this.toChangesPlan(false, plan, before, beforeFiles, dataFiles);
    }

    private RecordComparator partitionComparator() {
        if (this.lazyPartitionComparator == null) {
            this.lazyPartitionComparator = CodeGenUtils.newRecordComparator(this.tableSchema.logicalPartitionType().getFieldTypes());
        }
        return this.lazyPartitionComparator;
    }

    private List<DeletionFile> getDeletionFiles(List<DataFileMeta> dataFiles, List<IndexFileMeta> indexFileMetas) {
        ArrayList<DeletionFile> deletionFiles = new ArrayList<DeletionFile>(dataFiles.size());
        HashMap<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<String, IndexFileMeta>();
        for (IndexFileMeta indexFileMeta : indexFileMetas) {
            if (indexFileMeta.deletionVectorMetas() == null) continue;
            for (DeletionVectorMeta dvMeta : indexFileMeta.deletionVectorMetas().values()) {
                dataFileToIndexFileMeta.put(dvMeta.dataFileName(), indexFileMeta);
            }
        }
        for (DataFileMeta file : dataFiles) {
            LinkedHashMap<String, DeletionVectorMeta> dvMetas;
            IndexFileMeta indexFileMeta = (IndexFileMeta)dataFileToIndexFileMeta.get(file.fileName());
            if (indexFileMeta != null && (dvMetas = indexFileMeta.deletionVectorMetas()) != null && dvMetas.containsKey(file.fileName())) {
                deletionFiles.add(new DeletionFile(this.indexFileHandler.filePath(indexFileMeta).toString(), dvMetas.get(file.fileName()).offset(), dvMetas.get(file.fileName()).length(), dvMetas.get(file.fileName()).cardinality()));
                continue;
            }
            deletionFiles.add(null);
        }
        return deletionFiles;
    }
}

