/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.ArrayList;
import java.util.List;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.AbstractDataTableScan;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.Filter;

public class DataTableBatchScan
extends AbstractDataTableScan {
    private final DefaultValueAssigner defaultValueAssigner;
    private StartingScanner startingScanner;
    private boolean hasNext = true;
    private Integer pushDownLimit;

    public DataTableBatchScan(boolean pkTable, CoreOptions options, SnapshotReader snapshotReader, DefaultValueAssigner defaultValueAssigner) {
        super(options, snapshotReader);
        this.defaultValueAssigner = defaultValueAssigner;
        if (pkTable && (options.deletionVectorsEnabled() || options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW)) {
            snapshotReader.withLevelFilter((Filter<Integer>)((Filter)level -> level > 0)).enableValueFilter();
        }
    }

    @Override
    public InnerTableScan withFilter(Predicate predicate) {
        this.snapshotReader.withFilter(this.defaultValueAssigner.handlePredicate(predicate));
        return this;
    }

    @Override
    public InnerTableScan withLimit(int limit) {
        this.pushDownLimit = limit;
        return this;
    }

    @Override
    public TableScan.Plan plan() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(false);
        }
        if (this.hasNext) {
            this.hasNext = false;
            StartingScanner.Result result = this.startingScanner.scan(this.snapshotReader);
            StartingScanner.Result limitedResult = this.applyPushDownLimit(result);
            return DataFilePlan.fromResult(limitedResult);
        }
        throw new EndOfScanException();
    }

    @Override
    public List<PartitionEntry> listPartitionEntries() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(false);
        }
        return this.startingScanner.scanPartitions(this.snapshotReader);
    }

    private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) {
        if (this.pushDownLimit != null && result instanceof StartingScanner.ScannedResult) {
            long scannedRowCount = 0L;
            SnapshotReader.Plan plan = ((StartingScanner.ScannedResult)result).plan();
            List<DataSplit> splits = plan.dataSplits();
            if (splits.isEmpty()) {
                return result;
            }
            ArrayList<Split> limitedSplits = new ArrayList<Split>();
            for (DataSplit dataSplit : splits) {
                if (!dataSplit.rawConvertible()) continue;
                long partialMergedRowCount = dataSplit.partialMergedRowCount();
                limitedSplits.add(dataSplit);
                if ((scannedRowCount += partialMergedRowCount) < (long)this.pushDownLimit.intValue()) continue;
                PlanImpl newPlan = new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);
                return new StartingScanner.ScannedResult(newPlan);
            }
        }
        return result;
    }

    @Override
    public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
        this.snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
        return this;
    }
}

