/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.ReadBuffer;
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV1;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractInputStreamAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;

public class ITestReadBufferManager
extends AbstractAbfsIntegrationTest {
    public static final int TIMEOUT_OFFSET = 300000;
    public static final int PROBE_INTERVAL_MILLIS = 1000;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPurgeBufferManagerForParallelStreams() throws Exception {
        this.describe("Testing purging of buffers from ReadBufferManagerV1 for parallel input streams", new Object[0]);
        int numBuffers = 16;
        LinkedList<Integer> freeList = new LinkedList<Integer>();
        for (int i = 0; i < 16; ++i) {
            freeList.add(i);
        }
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        AzureBlobFileSystem fs = this.getABFSWithReadAheadConfig();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)fs.hasPathCapability(new Path("/"), "fs.azure.capability.readahead.safe")).describedAs("path capability %s in %s", new Object[]{"fs.azure.capability.readahead.safe", fs})).isTrue();
        try {
            for (int i = 0; i < 4; ++i) {
                String fileName = this.methodName.getMethodName() + i;
                executorService.submit(() -> {
                    byte[] fileContent = this.getRandomBytesArray(0x100000);
                    Path testFilePath = this.createFileWithContent((FileSystem)fs, fileName, fileContent);
                    try (FSDataInputStream iStream = fs.open(testFilePath);){
                        iStream.read();
                    }
                    return null;
                });
            }
        }
        finally {
            executorService.shutdown();
            executorService.awaitTermination(1L, TimeUnit.MINUTES);
        }
        ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager();
        this.assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
        LambdaTestUtils.eventually((int)(this.getTestTimeoutMillis() - 300000), (int)1000, () -> this.assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
    }

    private void assertListEmpty(String listName, List<ReadBuffer> list) {
        ((ListAssert)Assertions.assertThat(list).describedAs("After closing all streams %s should be empty", new Object[]{listName})).hasSize(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPurgeBufferManagerForSequentialStream() throws Exception {
        this.describe("Testing purging of buffers in ReadBufferManagerV1 for sequential input streams", new Object[0]);
        AzureBlobFileSystem fs = this.getABFSWithReadAheadConfig();
        String fileName = this.methodName.getMethodName();
        byte[] fileContent = this.getRandomBytesArray(0x100000);
        Path testFilePath = this.createFileWithContent((FileSystem)fs, fileName, fileContent);
        AbfsInputStream iStream1 = null;
        try {
            iStream1 = (AbfsInputStream)fs.open(testFilePath).getWrappedStream();
            iStream1.read();
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(iStream1);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)iStream1);
        ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager();
        AbfsInputStream iStream2 = null;
        try {
            iStream2 = (AbfsInputStream)fs.open(testFilePath).getWrappedStream();
            iStream2.read();
            this.assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(iStream2);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)iStream2);
        this.assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
        this.assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
    }

    private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> list, AbfsInputStream inputStream) {
        for (ReadBuffer buffer : list) {
            ((AbstractInputStreamAssert)Assertions.assertThat((InputStream)buffer.getStream()).describedAs("Buffers associated with closed input streams shouldn't be present", new Object[0])).isNotEqualTo((Object)inputStream);
        }
    }

    private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
        Configuration conf = this.getRawConfiguration();
        conf.setLong("fs.azure.readaheadqueue.depth", 8L);
        conf.setInt("fs.azure.read.request.size", 16384);
        conf.setInt("fs.azure.read.readahead.blocksize", 16384);
        return (AzureBlobFileSystem)FileSystem.newInstance((Configuration)conf);
    }

    @Override
    protected byte[] getRandomBytesArray(int length) {
        byte[] b = new byte[length];
        new Random().nextBytes(b);
        return b;
    }

    protected Path createFileWithContent(FileSystem fs, String fileName, byte[] fileContent) throws IOException {
        Path testFilePath = this.path(fileName);
        try (FSDataOutputStream oStream = fs.create(testFilePath);){
            oStream.write(fileContent);
            oStream.flush();
        }
        return testFilePath;
    }
}

