/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.disk.ChannelReaderInputView;
import org.apache.paimon.disk.ChannelReaderInputViewIterator;
import org.apache.paimon.disk.ChannelWithMeta;
import org.apache.paimon.disk.ChannelWriterOutputView;
import org.apache.paimon.disk.FileChannelUtil;
import org.apache.paimon.disk.FileIOChannel;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.CachelessSegmentPool;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.SizedReaderSupplier;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import org.apache.paimon.utils.Preconditions;

public class MergeSorter {
    private final RowType keyType;
    private RowType valueType;
    private final CoreOptions.SortEngine sortEngine;
    private final int spillThreshold;
    private final CompressOptions compression;
    private final MemorySegmentPool memoryPool;
    @Nullable
    private IOManager ioManager;

    public MergeSorter(CoreOptions options, RowType keyType, RowType valueType, @Nullable IOManager ioManager) {
        this.sortEngine = options.sortEngine();
        this.spillThreshold = options.sortSpillThreshold();
        this.compression = options.spillCompressOptions();
        this.keyType = keyType;
        this.valueType = valueType;
        this.memoryPool = new CachelessSegmentPool(options.sortSpillBufferSize(), options.pageSize());
        this.ioManager = ioManager;
    }

    public MemorySegmentPool memoryPool() {
        return this.memoryPool;
    }

    public RowType valueType() {
        return this.valueType;
    }

    public void setIOManager(IOManager ioManager) {
        this.ioManager = ioManager;
    }

    public void setProjectedValueType(RowType projectedType) {
        this.valueType = projectedType;
    }

    public <T> RecordReader<T> mergeSort(List<SizedReaderSupplier<KeyValue>> lazyReaders, Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper<T> mergeFunction) throws IOException {
        if (this.ioManager != null && lazyReaders.size() > this.spillThreshold) {
            return this.spillMergeSort(lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction);
        }
        return this.mergeSortNoSpill(lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction);
    }

    public <T> RecordReader<T> mergeSortNoSpill(List<? extends ReaderSupplier<KeyValue>> lazyReaders, Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper<T> mergeFunction) throws IOException {
        ArrayList<RecordReader<KeyValue>> readers = new ArrayList<RecordReader<KeyValue>>(lazyReaders.size());
        for (ReaderSupplier<KeyValue> readerSupplier : lazyReaders) {
            try {
                readers.add((RecordReader<KeyValue>)readerSupplier.get());
            }
            catch (IOException e) {
                readers.forEach(IOUtils::closeQuietly);
                throw e;
            }
        }
        return SortMergeReader.createSortMergeReader(readers, keyComparator, userDefinedSeqComparator, mergeFunction, this.sortEngine);
    }

    private <T> RecordReader<T> spillMergeSort(List<SizedReaderSupplier<KeyValue>> inputReaders, Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper<T> mergeFunction) throws IOException {
        ArrayList<SizedReaderSupplier<KeyValue>> sortedReaders = new ArrayList<SizedReaderSupplier<KeyValue>>(inputReaders);
        sortedReaders.sort(Comparator.comparingLong(SizedReaderSupplier::estimateSize));
        int spillSize = inputReaders.size() - this.spillThreshold;
        ArrayList<ReaderSupplier<KeyValue>> readers = new ArrayList<ReaderSupplier<KeyValue>>(sortedReaders.subList(spillSize, sortedReaders.size()));
        for (ReaderSupplier supplier : sortedReaders.subList(0, spillSize)) {
            readers.add(this.spill((ReaderSupplier<KeyValue>)supplier));
        }
        return this.mergeSortNoSpill(readers, keyComparator, userDefinedSeqComparator, mergeFunction);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ReaderSupplier<KeyValue> spill(ReaderSupplier<KeyValue> readerSupplier) throws IOException {
        ChannelWithMeta channelWithMeta;
        Preconditions.checkArgument((this.ioManager != null ? 1 : 0) != 0);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        KeyValueWithLevelNoReusingSerializer serializer = new KeyValueWithLevelNoReusingSerializer(this.keyType, this.valueType);
        BlockCompressionFactory compressFactory = BlockCompressionFactory.create((CompressOptions)this.compression);
        int compressBlock = (int)MemorySize.parse((String)"64 kb").getBytes();
        ChannelWriterOutputView out = FileChannelUtil.createOutputView(this.ioManager, channel, compressFactory, compressBlock);
        try (RecordReader reader = readerSupplier.get();){
            RecordReader.RecordIterator batch;
            while ((batch = reader.readBatch()) != null) {
                KeyValue record;
                while ((record = (KeyValue)batch.next()) != null) {
                    serializer.serialize(record, (DataOutputView)out);
                }
                batch.releaseBatch();
            }
        }
        finally {
            out.close();
            channelWithMeta = new ChannelWithMeta(channel, out.getBlockCount(), out.getWriteBytes());
        }
        return new SpilledReaderSupplier(channelWithMeta, compressFactory, compressBlock, serializer);
    }

    private static class ChannelReaderReader
    implements RecordReader<KeyValue> {
        private final ChannelReaderInputView view;
        private final ChannelReaderInputViewIterator iterator;
        private final KeyValueWithLevelNoReusingSerializer serializer;
        private boolean read = false;

        private ChannelReaderReader(ChannelReaderInputView view, ChannelReaderInputViewIterator iterator2, KeyValueWithLevelNoReusingSerializer serializer) {
            this.view = view;
            this.iterator = iterator2;
            this.serializer = serializer;
        }

        public RecordReader.RecordIterator<KeyValue> readBatch() {
            if (this.read) {
                return null;
            }
            this.read = true;
            return new RecordReader.RecordIterator<KeyValue>(){

                public KeyValue next() throws IOException {
                    BinaryRow noReuseRow = iterator.next();
                    if (noReuseRow == null) {
                        return null;
                    }
                    return serializer.fromRow((InternalRow)noReuseRow);
                }

                public void releaseBatch() {
                }
            };
        }

        public void close() throws IOException {
            this.view.getChannel().closeAndDelete();
        }
    }

    private class SpilledReaderSupplier
    implements ReaderSupplier<KeyValue> {
        private final ChannelWithMeta channel;
        private final BlockCompressionFactory compressFactory;
        private final int compressBlock;
        private final KeyValueWithLevelNoReusingSerializer serializer;

        public SpilledReaderSupplier(ChannelWithMeta channel, BlockCompressionFactory compressFactory, int compressBlock, KeyValueWithLevelNoReusingSerializer serializer) {
            this.channel = channel;
            this.compressFactory = compressFactory;
            this.compressBlock = compressBlock;
            this.serializer = serializer;
        }

        public RecordReader<KeyValue> get() throws IOException {
            ChannelReaderInputView view = FileChannelUtil.createInputView(MergeSorter.this.ioManager, this.channel, new ArrayList<FileIOChannel>(), this.compressFactory, this.compressBlock);
            BinaryRowSerializer rowSerializer = new BinaryRowSerializer(this.serializer.numFields());
            ChannelReaderInputViewIterator iterator2 = new ChannelReaderInputViewIterator(view, null, rowSerializer);
            return new ChannelReaderReader(view, iterator2, this.serializer);
        }
    }
}

