/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.contract;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.impl.TrackingByteBufferPool;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.FutureIO;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParameterizedClass(name="buffer-{0}")
@MethodSource(value={"params"})
public abstract class AbstractContractVectoredReadTest
extends AbstractFSContractTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
    public static final int DATASET_LEN = 131072;
    protected static final byte[] DATASET = ContractTestUtils.dataset(131072, 97, 32);
    protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
    private final IntFunction<ByteBuffer> allocate;
    private final ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
    private final String bufferType;
    private final boolean isDirect;
    private Path vectorPath;
    private final AtomicInteger bufferReleases = new AtomicInteger();

    public static List<String> params() {
        return Arrays.asList("direct", "array");
    }

    protected AbstractContractVectoredReadTest(String bufferType) {
        this.bufferType = bufferType;
        this.isDirect = !"array".equals(bufferType);
        this.allocate = size -> this.pool.getBuffer(this.isDirect, size);
    }

    protected IntFunction<ByteBuffer> getAllocate() {
        return this.allocate;
    }

    protected void release(ByteBuffer buffer) {
        LOG.info("Released buffer {}", (Object)buffer);
        this.bufferReleases.incrementAndGet();
        this.pool.putBuffer(buffer);
    }

    protected ElasticByteBufferPool getPool() {
        return this.pool;
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.vectorPath = this.path(VECTORED_READ_FILE_NAME);
        FileSystem fs = this.getFileSystem();
        ContractTestUtils.createFile(fs, this.vectorPath, true, DATASET);
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        this.pool.release();
        super.teardown();
    }

    protected FSDataInputStream openVectorFile() throws IOException {
        return this.openVectorFile(this.getFileSystem());
    }

    protected FSDataInputStream openVectorFile(FileSystem fs) throws IOException {
        return (FSDataInputStream)FutureIO.awaitFuture((Future)((FutureDataInputStreamBuilder)((FutureDataInputStreamBuilder)fs.openFile(this.vectorPath).opt("fs.option.openfile.length", 131072)).opt("fs.option.openfile.read.policy", "vector")).build());
    }

    @Test
    public void testVectoredReadMultipleRanges() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        for (int i = 0; i < 10; ++i) {
            FileRange fileRange = FileRange.createFileRange((long)(i * 100), (int)100);
            fileRanges.add(fileRange);
        }
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate, this::release);
            CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()];
            int i = 0;
            for (FileRange res : fileRanges) {
                completableFutures[i++] = res.getData();
            }
            CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
            combinedFuture.get();
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testVectoredReadAndReadFully() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 100L, 100);
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            byte[] readFullRes = new byte[100];
            in.readFully(100L, readFullRes);
            ByteBuffer vecRes = (ByteBuffer)FutureIO.awaitFuture((Future)((FileRange)fileRanges.get(0)).getData());
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)vecRes).describedAs("Result from vectored read and readFully must match", new Object[0])).isEqualByComparingTo((Comparable)ByteBuffer.wrap(readFullRes));
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testVectoredReadWholeFile() throws Exception {
        this.describe("Read the whole file in one single vectored read");
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 0L, 131072);
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            ByteBuffer vecRes = (ByteBuffer)FutureIO.awaitFuture((Future)((FileRange)fileRanges.get(0)).getData());
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)vecRes).describedAs("Result from vectored read and readFully must match", new Object[0])).isEqualByComparingTo((Comparable)ByteBuffer.wrap(DATASET));
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testDisjointRanges() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 0L, 100);
        ContractTestUtils.range(fileRanges, 4101L, 100);
        ContractTestUtils.range(fileRanges, 16101L, 100);
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate, this::release);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testAllRangesMergedIntoOne() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        int length = 100;
        ContractTestUtils.range(fileRanges, 0L, 100);
        ContractTestUtils.range(fileRanges, 3899L, 100);
        ContractTestUtils.range(fileRanges, 7899L, 100);
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testSomeRangesMergedSomeUnmerged() throws Exception {
        FileSystem fs = this.getFileSystem();
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 8192L, 100);
        ContractTestUtils.range(fileRanges, 14336L, 100);
        ContractTestUtils.range(fileRanges, 10240L, 100);
        ContractTestUtils.range(fileRanges, 1947L, 100);
        ContractTestUtils.range(fileRanges, 40960L, 1024);
        FileStatus fileStatus = fs.getFileStatus(this.path(VECTORED_READ_FILE_NAME));
        CompletableFuture builder = fs.openFile(this.path(VECTORED_READ_FILE_NAME)).withFileStatus(fileStatus).build();
        try (FSDataInputStream in = (FSDataInputStream)builder.get();){
            in.readVectored(fileRanges, this.allocate, this::release);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testOverlappingRanges() throws Exception {
        if (!this.isSupported("vector-io-overlapping-ranges")) {
            this.verifyExceptionalVectoredRead(this.getSampleOverlappingRanges(), IllegalArgumentException.class);
        } else {
            try (FSDataInputStream in = this.openVectorFile();){
                List<FileRange> fileRanges = this.getSampleOverlappingRanges();
                in.readVectored(fileRanges, this.allocate);
                ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
                ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
            }
        }
    }

    @Test
    public void testSameRanges() throws Exception {
        if (!this.isSupported("vector-io-overlapping-ranges")) {
            this.verifyExceptionalVectoredRead(this.getSampleSameRanges(), IllegalArgumentException.class);
        } else {
            try (FSDataInputStream in = this.openVectorFile();){
                List<FileRange> fileRanges = this.getSampleSameRanges();
                in.readVectored(fileRanges, this.allocate, this::release);
                ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
                ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
            }
        }
    }

    @Test
    public void testNullRange() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 500L, 100);
        fileRanges.add(null);
        this.verifyExceptionalVectoredRead(fileRanges, NullPointerException.class);
    }

    @Test
    public void testNullRangeList() throws Exception {
        this.verifyExceptionalVectoredRead(null, NullPointerException.class);
    }

    @Test
    public void testSomeRandomNonOverlappingRanges() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 500L, 100);
        ContractTestUtils.range(fileRanges, 1000L, 200);
        ContractTestUtils.range(fileRanges, 50L, 10);
        ContractTestUtils.range(fileRanges, 10L, 5);
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testConsecutiveRanges() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        int offset = 500;
        int length = 2011;
        ContractTestUtils.range(fileRanges, 500L, 2011);
        ContractTestUtils.range(fileRanges, 2511L, 2011);
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate, this::release);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testEmptyRanges() throws Exception {
        ArrayList fileRanges = new ArrayList();
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            ((ListAssert)Assertions.assertThat(fileRanges).describedAs("Empty ranges must stay empty", new Object[0])).isEmpty();
        }
    }

    @Test
    public void testEOFRanges() throws Exception {
        this.describe("Testing reading with an offset past the end of the file");
        List<FileRange> fileRanges = ContractTestUtils.range(131073L, 100);
        if (this.isSupported("vector-io-early-eof-check")) {
            LOG.info("Expecting early EOF failure");
            this.verifyExceptionalVectoredRead(fileRanges, EOFException.class);
        } else {
            this.expectEOFinRead(fileRanges);
        }
    }

    @Test
    public void testVectoredReadWholeFilePlusOne() throws Exception {
        this.describe("Try to read whole file plus 1 byte");
        List<FileRange> fileRanges = ContractTestUtils.range(0L, 131073);
        if (this.isSupported("vector-io-early-eof-check")) {
            LOG.info("Expecting early EOF failure");
            this.verifyExceptionalVectoredRead(fileRanges, EOFException.class);
        } else {
            this.expectEOFinRead(fileRanges);
        }
    }

    private void expectEOFinRead(List<FileRange> fileRanges) throws Exception {
        LOG.info("Expecting late EOF failure");
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate, this::release);
            for (FileRange res : fileRanges) {
                CompletableFuture data = res.getData();
                LambdaTestUtils.interceptFuture(EOFException.class, "", 300L, TimeUnit.SECONDS, data);
            }
        }
    }

    @Test
    public void testNegativeLengthRange() throws Exception {
        this.verifyExceptionalVectoredRead(ContractTestUtils.range(0L, -50), IllegalArgumentException.class);
    }

    @Test
    public void testNegativeOffsetRange() throws Exception {
        this.verifyExceptionalVectoredRead(ContractTestUtils.range(-1L, 50), EOFException.class);
    }

    @Test
    public void testNullReleaseOperation() throws Exception {
        List<FileRange> range = ContractTestUtils.range(0L, 10);
        try (FSDataInputStream in = this.openVectorFile();){
            LambdaTestUtils.intercept(NullPointerException.class, () -> in.readVectored(range, this.allocate, null));
        }
    }

    @Test
    public void testNormalReadAfterVectoredRead() throws Exception {
        List<FileRange> fileRanges = this.createSampleNonOverlappingRanges();
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            int len = 200;
            byte[] res = new byte[200];
            in.readFully(res, 0, 200);
            ByteBuffer buffer = ByteBuffer.wrap(res);
            ContractTestUtils.assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testVectoredReadAfterNormalRead() throws Exception {
        List<FileRange> fileRanges = this.createSampleNonOverlappingRanges();
        try (FSDataInputStream in = this.openVectorFile();){
            int len = 200;
            byte[] res = new byte[200];
            in.readFully(res, 0, 200);
            ByteBuffer buffer = ByteBuffer.wrap(res);
            ContractTestUtils.assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
            in.readVectored(fileRanges, this.allocate);
            ContractTestUtils.validateVectoredReadResult(fileRanges, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges, (ByteBufferPool)this.pool);
        }
    }

    @Test
    public void testMultipleVectoredReads() throws Exception {
        List<FileRange> fileRanges1 = this.createSampleNonOverlappingRanges();
        List<FileRange> fileRanges2 = this.createSampleNonOverlappingRanges();
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges1, this.allocate);
            in.readVectored(fileRanges2, this.allocate, this::release);
            ContractTestUtils.validateVectoredReadResult(fileRanges2, DATASET, 0L);
            ContractTestUtils.validateVectoredReadResult(fileRanges1, DATASET, 0L);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges1, (ByteBufferPool)this.pool);
            ContractTestUtils.returnBuffersToPoolPostRead(fileRanges2, (ByteBufferPool)this.pool);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testVectoredIOEndToEnd() throws Exception {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 8192L, 100);
        ContractTestUtils.range(fileRanges, 14336L, 100);
        ContractTestUtils.range(fileRanges, 10240L, 100);
        ContractTestUtils.range(fileRanges, 1947L, 100);
        ContractTestUtils.range(fileRanges, 40960L, 1024);
        ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
        CountDownLatch countDown = new CountDownLatch(fileRanges.size());
        try (FSDataInputStream in = this.openVectorFile();){
            in.readVectored(fileRanges, this.allocate);
            for (FileRange res : fileRanges) {
                dataProcessor.submit(() -> {
                    try {
                        this.readBufferValidateDataAndReturnToPool(res, countDown);
                    }
                    catch (Exception e) {
                        String error = String.format("Error while processing result for %s", res);
                        LOG.error(error, (Throwable)e);
                        ContractTestUtils.fail(error, e);
                    }
                });
            }
            if (!countDown.await(300L, TimeUnit.SECONDS)) {
                ContractTestUtils.fail((String)"Timeout/Error while processing vectored io results");
            }
        }
        finally {
            HadoopExecutors.shutdown((ExecutorService)dataProcessor, (Logger)LOG, (long)300L, (TimeUnit)TimeUnit.SECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readBufferValidateDataAndReturnToPool(FileRange res, CountDownLatch countDownLatch) throws IOException, TimeoutException {
        try {
            CompletableFuture data = res.getData();
            FutureIO.awaitFuture((Future)((Object)data.thenAccept(buffer -> {
                ContractTestUtils.assertDatasetEquals((int)res.getOffset(), "vecRead", buffer, res.getLength(), DATASET);
                this.pool.putBuffer(buffer);
            })), (long)300L, (TimeUnit)TimeUnit.SECONDS);
        }
        finally {
            countDownLatch.countDown();
        }
    }

    protected List<FileRange> createSampleNonOverlappingRanges() {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 0L, 100);
        ContractTestUtils.range(fileRanges, 110L, 50);
        return fileRanges;
    }

    protected List<FileRange> getSampleSameRanges() {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 8000L, 1000);
        ContractTestUtils.range(fileRanges, 8000L, 1000);
        ContractTestUtils.range(fileRanges, 8000L, 1000);
        return fileRanges;
    }

    protected List<FileRange> getSampleOverlappingRanges() {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 100L, 500);
        ContractTestUtils.range(fileRanges, 400L, 500);
        return fileRanges;
    }

    protected List<FileRange> getConsecutiveRanges() {
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        ContractTestUtils.range(fileRanges, 100L, 500);
        ContractTestUtils.range(fileRanges, 600L, 500);
        return fileRanges;
    }

    protected <T extends Throwable> void verifyExceptionalVectoredRead(List<FileRange> fileRanges, Class<T> clazz) throws Exception {
        try (FSDataInputStream in = this.openVectorFile();){
            LambdaTestUtils.intercept(clazz, () -> {
                in.readVectored(fileRanges, this.allocate);
                return "triggered read of " + fileRanges.size() + " ranges against " + in;
            });
        }
    }

    @Test
    public void testBufferSlicing() throws Throwable {
        boolean slicing;
        this.describe("Test buffer slicing behavior in vectored IO");
        int numBuffers = 8;
        int bufferSize = 4096;
        long offset = 0L;
        ArrayList<FileRange> fileRanges = new ArrayList<FileRange>();
        for (int i = 0; i < 8; ++i) {
            fileRanges.add(FileRange.createFileRange((long)offset, (int)4096));
            offset += 8096L;
        }
        TrackingByteBufferPool trackerPool = TrackingByteBufferPool.wrap((ByteBufferPool)this.getPool());
        int unknownBuffers = 0;
        try (FSDataInputStream in = this.openVectorFile();){
            slicing = in.hasCapability("fs.capability.vectoredio.sliced");
            LOG.info("Slicing is {} for vectored IO with stream {}", (Object)slicing, (Object)in);
            in.readVectored(fileRanges, s -> trackerPool.getBuffer(this.isDirect, s), arg_0 -> ((TrackingByteBufferPool)trackerPool).putBuffer(arg_0));
            for (FileRange res : fileRanges) {
                CompletableFuture data = res.getData();
                ByteBuffer buffer = (ByteBuffer)FutureIO.awaitFuture((Future)data);
                ((AbstractComparableAssert)Assertions.assertThat((Comparable)buffer).describedAs("Buffer must not be null", new Object[0])).isNotNull();
                ((AbstractBooleanAssert)Assertions.assertThat((slicing || trackerPool.containsBuffer(buffer) ? 1 : 0) != 0).describedAs("Buffer must be from the pool", new Object[0])).isTrue();
                try {
                    trackerPool.putBuffer(buffer);
                }
                catch (TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) {
                    if (!slicing) {
                        throw e;
                    }
                    LOG.info("Sliced buffer detected: {}", (Object)buffer);
                    ++unknownBuffers;
                }
            }
        }
        try {
            trackerPool.close();
        }
        catch (TrackingByteBufferPool.LeakedByteBufferException e) {
            if (!slicing) {
                throw e;
            }
            LOG.info("Slicing is enabled; we saw leaked buffers: {} after {} releases of unknown buffers", (Object)e.getCount(), (Object)unknownBuffers);
        }
    }
}

