/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallellyExecuteUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

public abstract class AbstractFileStoreScan
implements FileStoreScan {
    private final FieldStatsArraySerializer partitionStatsConverter;
    private final RowType partitionType;
    private final SnapshotManager snapshotManager;
    private final ManifestFile.Factory manifestFileFactory;
    private final ManifestList manifestList;
    private final int numOfBuckets;
    private final boolean checkNumOfBuckets;
    private final ConcurrentMap<Long, TableSchema> tableSchemas;
    private final SchemaManager schemaManager;
    protected final ScanBucketFilter bucketKeyFilter;
    private PartitionPredicate partitionFilter;
    private Snapshot specifiedSnapshot = null;
    private Filter<Integer> bucketFilter = null;
    private List<ManifestFileMeta> specifiedManifests = null;
    private ScanMode scanMode = ScanMode.ALL;
    private Filter<Integer> levelFilter = null;
    private ManifestCacheFilter manifestCacheFilter = null;
    private final Integer scanManifestParallelism;
    private ScanMetrics scanMetrics = null;

    public AbstractFileStoreScan(RowType partitionType, ScanBucketFilter bucketKeyFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism) {
        this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
        this.partitionType = partitionType;
        this.bucketKeyFilter = bucketKeyFilter;
        this.snapshotManager = snapshotManager;
        this.schemaManager = schemaManager;
        this.manifestFileFactory = manifestFileFactory;
        this.manifestList = manifestListFactory.create();
        this.numOfBuckets = numOfBuckets;
        this.checkNumOfBuckets = checkNumOfBuckets;
        this.tableSchemas = new ConcurrentHashMap<Long, TableSchema>();
        this.scanManifestParallelism = scanManifestParallelism;
    }

    @Override
    public FileStoreScan withPartitionFilter(Predicate predicate) {
        this.partitionFilter = this.partitionType.getFieldCount() > 0 && predicate != null ? PartitionPredicate.fromPredicate(this.partitionType, predicate) : null;
        return this;
    }

    @Override
    public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
        this.partitionFilter = this.partitionType.getFieldCount() > 0 && !partitions.isEmpty() ? PartitionPredicate.fromMultiple(this.partitionType, partitions) : null;
        return this;
    }

    @Override
    public FileStoreScan withBucket(int bucket) {
        this.bucketFilter = i -> i == bucket;
        return this;
    }

    @Override
    public FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {
        this.bucketFilter = bucketFilter;
        return this;
    }

    @Override
    public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
        if (this.manifestCacheFilter != null) {
            Preconditions.checkArgument((boolean)this.manifestCacheFilter.test(partition, bucket), (Object)String.format("This is a bug! The partition %s and bucket %s is filtered!", partition, bucket));
        }
        this.withPartitionFilter(Collections.singletonList(partition));
        this.withBucket(bucket);
        return this;
    }

    @Override
    public FileStoreScan withSnapshot(long snapshotId) {
        Preconditions.checkState((this.specifiedManifests == null ? 1 : 0) != 0, (Object)"Cannot set both snapshot and manifests.");
        this.specifiedSnapshot = this.snapshotManager.snapshot(snapshotId);
        return this;
    }

    @Override
    public FileStoreScan withSnapshot(Snapshot snapshot) {
        Preconditions.checkState((this.specifiedManifests == null ? 1 : 0) != 0, (Object)"Cannot set both snapshot and manifests.");
        this.specifiedSnapshot = snapshot;
        return this;
    }

    @Override
    public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
        Preconditions.checkState((this.specifiedSnapshot == null ? 1 : 0) != 0, (Object)"Cannot set both snapshot and manifests.");
        this.specifiedManifests = manifests;
        return this;
    }

    @Override
    public FileStoreScan withKind(ScanMode scanMode) {
        this.scanMode = scanMode;
        return this;
    }

    @Override
    public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
        this.levelFilter = levelFilter;
        return this;
    }

    @Override
    public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter) {
        this.manifestCacheFilter = manifestFilter;
        return this;
    }

    @Override
    public FileStoreScan withMetrics(ScanMetrics metrics) {
        this.scanMetrics = metrics;
        return this;
    }

    @Override
    public FileStoreScan.Plan plan() {
        Pair<Snapshot, List<ManifestEntry>> planResult = this.doPlan(this::readManifestFileMeta);
        final Snapshot readSnapshot = (Snapshot)planResult.getLeft();
        final List files = (List)planResult.getRight();
        return new FileStoreScan.Plan(){

            @Override
            @Nullable
            public Long watermark() {
                return readSnapshot == null ? null : readSnapshot.watermark();
            }

            @Override
            @Nullable
            public Long snapshotId() {
                return readSnapshot == null ? null : Long.valueOf(readSnapshot.id());
            }

            @Override
            public ScanMode scanMode() {
                return AbstractFileStoreScan.this.scanMode;
            }

            @Override
            public List<ManifestEntry> files() {
                return files;
            }
        };
    }

    private Pair<Snapshot, List<ManifestEntry>> doPlan(Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
        long started = System.nanoTime();
        List<ManifestFileMeta> manifests = this.specifiedManifests;
        Snapshot snapshot = null;
        if (manifests == null) {
            snapshot = this.specifiedSnapshot == null ? this.snapshotManager.latestSnapshot() : this.specifiedSnapshot;
            manifests = snapshot == null ? Collections.emptyList() : this.readManifests(snapshot);
        }
        long startDataFiles = manifests.stream().mapToLong(f -> f.numAddedFiles() + f.numDeletedFiles()).sum();
        AtomicLong cntEntries = new AtomicLong(0L);
        Iterable<ManifestEntry> entries = ParallellyExecuteUtils.parallelismBatchIterable(files -> {
            List entryList = files.parallelStream().filter(this::filterManifestFileMeta).flatMap(m -> ((List)readManifest.apply((ManifestFileMeta)m)).stream()).filter(this::filterByStats).collect(Collectors.toList());
            cntEntries.getAndAdd(entryList.size());
            return entryList;
        }, manifests, this.scanManifestParallelism);
        List<Object> files2 = new ArrayList<ManifestEntry>();
        Collection<ManifestEntry> mergedEntries = ManifestEntry.mergeEntries(entries);
        long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
        for (ManifestEntry file2 : mergedEntries) {
            if (this.checkNumOfBuckets && file2.totalBuckets() != this.numOfBuckets) {
                String partInfo = this.partitionType.getFieldCount() > 0 ? "partition " + FileStorePathFactory.getPartitionComputer(this.partitionType, (String)FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue()).generatePartValues((InternalRow)file2.partition()) : "table";
                throw new RuntimeException(String.format("Try to write %s with a new bucket num %d, but the previous bucket num is %d. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", partInfo, this.numOfBuckets, file2.totalBuckets()));
            }
            if (!this.filterByBucket(file2) || !this.filterByBucketSelector(file2) || !this.filterByLevel(file2)) continue;
            files2.add(file2);
        }
        long afterBucketFilter = files2.size();
        long skippedByBucketAndLevelFilter = mergedEntries.size() - files2.size();
        files2 = files2.stream().collect(Collectors.groupingBy(file -> Pair.of((Object)file.partition(), (Object)file.bucket()), LinkedHashMap::new, Collectors.toList())).values().stream().filter(this::filterWholeBucketByStats).flatMap(Collection::stream).collect(Collectors.toList());
        long skippedByWholeBucketFiles = afterBucketFilter - (long)files2.size();
        long scanDuration = (System.nanoTime() - started) / 1000000L;
        if (this.scanMetrics != null) {
            this.scanMetrics.reportScan(new ScanStats(scanDuration, manifests.size(), skippedByPartitionAndStats, skippedByBucketAndLevelFilter, skippedByWholeBucketFiles, files2.size()));
        }
        return Pair.of((Object)snapshot, files2);
    }

    private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
        switch (this.scanMode) {
            case ALL: {
                return snapshot.dataManifests(this.manifestList);
            }
            case DELTA: {
                return snapshot.deltaManifests(this.manifestList);
            }
            case CHANGELOG: {
                if (snapshot.version() > 1) {
                    return snapshot.changelogManifests(this.manifestList);
                }
                if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
                    return snapshot.deltaManifests(this.manifestList);
                }
                throw new IllegalStateException(String.format("Incremental scan does not accept %s snapshot", new Object[]{snapshot.commitKind()}));
            }
        }
        throw new UnsupportedOperationException("Unknown scan kind " + this.scanMode.name());
    }

    protected TableSchema scanTableSchema(long id) {
        return this.tableSchemas.computeIfAbsent(id, key -> this.schemaManager.schema(id));
    }

    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
        return this.partitionFilter == null || this.partitionFilter.test(manifest.numAddedFiles() + manifest.numDeletedFiles(), manifest.partitionStats().fields(this.partitionStatsConverter));
    }

    private boolean filterByBucket(ManifestEntry entry) {
        return this.bucketFilter == null || this.bucketFilter.test((Object)entry.bucket());
    }

    private boolean filterByBucketSelector(ManifestEntry entry) {
        return this.bucketKeyFilter.select(entry.bucket(), entry.totalBuckets());
    }

    private boolean filterByLevel(ManifestEntry entry) {
        return this.levelFilter == null || this.levelFilter.test((Object)entry.file().level());
    }

    protected abstract boolean filterByStats(ManifestEntry var1);

    protected abstract boolean filterWholeBucketByStats(List<ManifestEntry> var1);

    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
        return this.manifestFileFactory.create().read(manifest.fileName(), this.manifestCacheRowFilter(), this.manifestEntryRowFilter());
    }

    private Filter<InternalRow> manifestEntryRowFilter() {
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> totalBucketGetter = ManifestEntrySerializer.totalBucketGetter();
        return row -> {
            if (this.partitionFilter != null && !this.partitionFilter.test((BinaryRow)partitionGetter.apply((InternalRow)row))) {
                return false;
            }
            if (this.bucketFilter != null && this.numOfBuckets == (Integer)totalBucketGetter.apply((InternalRow)row)) {
                return this.bucketFilter.test(bucketGetter.apply((InternalRow)row));
            }
            return true;
        };
    }

    private Filter<InternalRow> manifestCacheRowFilter() {
        if (this.manifestCacheFilter == null) {
            return Filter.alwaysTrue();
        }
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> totalBucketGetter = ManifestEntrySerializer.totalBucketGetter();
        return row -> {
            if (this.numOfBuckets != (Integer)totalBucketGetter.apply((InternalRow)row)) {
                return true;
            }
            return this.manifestCacheFilter.test((BinaryRow)partitionGetter.apply((InternalRow)row), (Integer)bucketGetter.apply((InternalRow)row));
        };
    }
}

