/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.operation.DiffReader;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.ReverseReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ProjectedRow;

public class KeyValueFileStoreRead
implements FileStoreRead<KeyValue> {
    private final TableSchema tableSchema;
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final Comparator<InternalRow> keyComparator;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final boolean valueCountMode;
    private final MergeSorter mergeSorter;
    @Nullable
    private int[][] keyProjectedFields;
    @Nullable
    private List<Predicate> filtersForOverlappedSection;
    @Nullable
    private List<Predicate> filtersForNonOverlappedSection;
    @Nullable
    private int[][] pushdownProjection;
    @Nullable
    private int[][] outerProjection;
    private boolean forceKeepDelete = false;

    public KeyValueFileStoreRead(FileIO fileIO, SchemaManager schemaManager, long schemaId, RowType keyType, RowType valueType, Comparator<InternalRow> keyComparator, MergeFunctionFactory<KeyValue> mfFactory, FileFormatDiscover formatDiscover, FileStorePathFactory pathFactory, KeyValueFieldsExtractor extractor) {
        this.tableSchema = schemaManager.schema(schemaId);
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(fileIO, schemaManager, schemaId, keyType, valueType, formatDiscover, pathFactory, extractor);
        this.keyComparator = keyComparator;
        this.mfFactory = mfFactory;
        this.valueCountMode = this.tableSchema.trimmedPrimaryKeys().isEmpty();
        this.mergeSorter = new MergeSorter(CoreOptions.fromMap(this.tableSchema.options()), keyType, valueType, null);
    }

    public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
        this.readerFactoryBuilder.withKeyProjection(projectedFields);
        this.keyProjectedFields = projectedFields;
        return this;
    }

    public KeyValueFileStoreRead withValueProjection(int[][] projectedFields) {
        MergeFunctionFactory.AdjustedProjection projection = this.mfFactory.adjustProjection(projectedFields);
        this.pushdownProjection = projection.pushdownProjection;
        this.outerProjection = projection.outerProjection;
        if (this.pushdownProjection != null) {
            this.readerFactoryBuilder.withValueProjection(this.pushdownProjection);
            this.mergeSorter.setProjectedValueType(this.readerFactoryBuilder.projectedValueType());
        }
        return this;
    }

    public KeyValueFileStoreRead withIOManager(IOManager ioManager) {
        this.mergeSorter.setIOManager(ioManager);
        return this;
    }

    public KeyValueFileStoreRead forceKeepDelete() {
        this.forceKeepDelete = true;
        return this;
    }

    @Override
    public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
        ArrayList<Predicate> allFilters = new ArrayList<Predicate>();
        ArrayList<Predicate> pkFilters = null;
        List<String> primaryKeys = this.tableSchema.trimmedPrimaryKeys();
        Set nonPrimaryKeys = this.tableSchema.fieldNames().stream().filter(name -> !primaryKeys.contains(name)).collect(Collectors.toSet());
        for (Predicate sub : PredicateBuilder.splitAnd((Predicate)predicate)) {
            allFilters.add(sub);
            if (PredicateBuilder.containsFields((Predicate)sub, nonPrimaryKeys)) continue;
            if (pkFilters == null) {
                pkFilters = new ArrayList<Predicate>();
            }
            pkFilters.add(sub);
        }
        this.filtersForNonOverlappedSection = allFilters;
        this.filtersForOverlappedSection = this.valueCountMode ? allFilters : pkFilters;
        return this;
    }

    @Override
    public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
        RecordReader reader = this.createReaderWithoutOuterProjection(split);
        if (this.outerProjection != null) {
            ProjectedRow projectedRow = ProjectedRow.from((int[][])this.outerProjection);
            reader = reader.transform(kv -> kv.replaceValue((InternalRow)projectedRow.replaceRow(kv.value())));
        }
        return reader;
    }

    private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit split) throws IOException {
        if (split.isStreaming()) {
            KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(split.partition(), split.bucket(), true, this.filtersForOverlappedSection);
            ConcatRecordReader.ReaderSupplier beforeSupplier = () -> new ReverseReader(this.streamingConcat(split.beforeFiles(), readerFactory));
            ConcatRecordReader.ReaderSupplier dataSupplier = () -> this.streamingConcat(split.dataFiles(), readerFactory);
            return split.beforeFiles().isEmpty() ? dataSupplier.get() : ConcatRecordReader.create(Arrays.asList(beforeSupplier, dataSupplier));
        }
        return split.beforeFiles().isEmpty() ? this.batchMergeRead(split.partition(), split.bucket(), split.dataFiles(), this.forceKeepDelete) : DiffReader.readDiff(this.batchMergeRead(split.partition(), split.bucket(), split.beforeFiles(), false), this.batchMergeRead(split.partition(), split.bucket(), split.dataFiles(), false), this.keyComparator, this.mergeSorter, this.forceKeepDelete);
    }

    private RecordReader<KeyValue> batchMergeRead(BinaryRow partition, int bucket, List<DataFileMeta> files, boolean keepDelete) throws IOException {
        KeyValueFileReaderFactory overlappedSectionFactory = this.readerFactoryBuilder.build(partition, bucket, false, this.filtersForOverlappedSection);
        KeyValueFileReaderFactory nonOverlappedSectionFactory = this.readerFactoryBuilder.build(partition, bucket, false, this.filtersForNonOverlappedSection);
        ArrayList sectionReaders = new ArrayList();
        ReducerMergeFunctionWrapper mergeFuncWrapper = new ReducerMergeFunctionWrapper(this.mfFactory.create(this.pushdownProjection));
        for (List<SortedRun> section : new IntervalPartition(files, this.keyComparator).partition()) {
            sectionReaders.add(() -> MergeTreeReaders.readerForSection(section, section.size() > 1 ? overlappedSectionFactory : nonOverlappedSectionFactory, this.keyComparator, mergeFuncWrapper, this.mergeSorter));
        }
        RecordReader<KeyValue> reader = ConcatRecordReader.create(sectionReaders);
        if (!keepDelete) {
            reader = new RecordReader<KeyValue>(reader);
        }
        return this.keyProjectedFields == null ? reader : this.projectKey(reader, this.keyProjectedFields);
    }

    private RecordReader<KeyValue> streamingConcat(List<DataFileMeta> files, KeyValueFileReaderFactory readerFactory) throws IOException {
        ArrayList suppliers = new ArrayList();
        for (DataFileMeta file : files) {
            suppliers.add(() -> {
                String fileName = this.changelogFile(file).orElse(file.fileName());
                return readerFactory.createRecordReader(file.schemaId(), fileName, file.level());
            });
        }
        return ConcatRecordReader.create(suppliers);
    }

    private Optional<String> changelogFile(DataFileMeta fileMeta) {
        for (String file : fileMeta.extraFiles()) {
            if (!file.startsWith("changelog-")) continue;
            return Optional.of(file);
        }
        return Optional.empty();
    }

    private RecordReader<KeyValue> projectKey(RecordReader<KeyValue> reader, int[][] keyProjectedFields) {
        ProjectedRow projectedRow = ProjectedRow.from((int[][])keyProjectedFields);
        return reader.transform(kv -> kv.replaceKey((InternalRow)projectedRow.replaceRow(kv.key())));
    }
}

