/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.contract;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.util.functional.FutureIO;
import org.assertj.core.api.AbstractByteAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ITestAbfsFileSystemContractSeek
extends AbstractContractSeekTest {
    private final boolean isSecure;
    private final ABFSContractTestBinding binding = new ABFSContractTestBinding();
    private static final byte[] BLOCK = ContractTestUtils.dataset((int)102400, (int)0, (int)255);

    public ITestAbfsFileSystemContractSeek() throws Exception {
        this.isSecure = this.binding.isSecureMode();
    }

    @BeforeEach
    public void setup() throws Exception {
        this.binding.setup();
        super.setup();
    }

    protected Configuration createConfiguration() {
        return this.binding.getRawConfiguration();
    }

    protected AbstractFSContract createContract(Configuration conf) {
        conf.setInt("fs.azure.readahead.range", 16384);
        conf.setInt("fs.azure.read.request.size", 16384);
        AbfsTestUtils.disableFilesystemCaching(conf);
        return new AbfsFileSystemContract(conf, this.isSecure);
    }

    @Test
    public void testSeekAndReadWithReadAhead() throws IOException {
        this.describe(" Testing seek and read with read ahead enabled for random reads");
        Path testSeekFile = this.path(this.getMethodName() + "bigseekfile.txt");
        this.createDataSet(testSeekFile);
        try (FSDataInputStream in = this.getFileSystem().open(testSeekFile);){
            AbfsInputStream inStream = (AbfsInputStream)in.getWrappedStream();
            AbfsInputStreamStatisticsImpl streamStatistics = (AbfsInputStreamStatisticsImpl)inStream.getStreamStatistics();
            ITestAbfsFileSystemContractSeek.assertEquals((int)16384, (int)inStream.getReadAheadRange(), (String)String.format("Value of %s is not set correctly", "fs.azure.readahead.range"));
            long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations();
            ((AbstractLongAssert)Assertions.assertThat((long)remoteReadOperationsOldVal).describedAs("Number of remote read ops should be 0 before any read call is made", new Object[0])).isEqualTo(0L);
            ((AbstractLongAssert)Assertions.assertThat((long)inStream.getPos()).describedAs("First call to getPos() should return 0", new Object[0])).isEqualTo(0L);
            this.assertDataAtPos(0, (byte)in.read());
            this.assertSeekBufferStats(0L, streamStatistics.getSeekInBuffer());
            long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            int newSeek = inStream.getReadAheadRange() - 1;
            in.seek((long)newSeek);
            this.assertGetPosition(newSeek, in.getPos());
            this.assertDataAtPos(newSeek, (byte)in.read());
            this.assertSeekBufferStats(1L, streamStatistics.getSeekInBuffer());
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            newSeek = inStream.getReadAheadRange();
            inStream.seek((long)newSeek);
            this.assertGetPosition(newSeek, in.getPos());
            this.assertDataAtPos(newSeek, (byte)in.read());
            this.assertSeekBufferStats(1L, streamStatistics.getSeekInBuffer());
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            newSeek = inStream.getReadAheadRange() + 1;
            in.seek((long)newSeek);
            this.assertGetPosition(newSeek, in.getPos());
            this.assertDataAtPos(newSeek, (byte)in.read());
            this.assertSeekBufferStats(2L, streamStatistics.getSeekInBuffer());
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            in.seek((long)(newSeek += 10));
            this.assertGetPosition(newSeek, in.getPos());
            this.assertDataAtPos(newSeek, (byte)in.read());
            this.assertSeekBufferStats(3L, streamStatistics.getSeekInBuffer());
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            in.seek((long)(newSeek -= 106));
            this.assertGetPosition(newSeek, in.getPos());
            this.assertDataAtPos(newSeek, (byte)in.read());
            this.assertSeekBufferStats(3L, streamStatistics.getSeekInBuffer());
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            in.seek((long)(newSeek += 10));
            this.assertGetPosition(newSeek, in.getPos());
            this.assertDataAtPos(newSeek, (byte)in.read());
            this.assertSeekBufferStats(4L, streamStatistics.getSeekInBuffer());
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
            remoteReadOperationsOldVal = remoteReadOperationsNewVal;
            long oldSeek = newSeek;
            newSeek = 2 * inStream.getReadAheadRange() - 1;
            byte[] bytes = new byte[5];
            in.readFully((long)newSeek, bytes);
            this.assertGetPosition(oldSeek + 1L, in.getPos());
            this.assertSeekBufferStats(4L, streamStatistics.getSeekInBuffer());
            this.assertDatasetEquals(newSeek, "Read across read ahead ", bytes, bytes.length);
            remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
            this.assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal);
        }
    }

    @Test
    public void testSeekAfterUnbuffer() throws IOException {
        this.describe("Test to make sure that seeking in AbfsInputStream after unbuffer() call is not doing anyIO.");
        Path testFile = this.path(this.getMethodName() + ".txt");
        this.createDataSet(testFile);
        CompletableFuture future = this.getFileSystem().openFile(testFile).build();
        try (FSDataInputStream inputStream = (FSDataInputStream)FutureIO.awaitFuture((Future)future);){
            AbfsInputStream abfsInputStream = (AbfsInputStream)inputStream.getWrappedStream();
            AbfsInputStreamStatisticsImpl streamStatistics = (AbfsInputStreamStatisticsImpl)abfsInputStream.getStreamStatistics();
            int readAheadRange = abfsInputStream.getReadAheadRange();
            long seekPos = readAheadRange;
            inputStream.seek(seekPos);
            this.assertDataAtPos(readAheadRange, (byte)inputStream.read());
            long currentRemoteReadOps = streamStatistics.getRemoteReadOperations();
            this.assertIncrementInRemoteReadOps(0L, currentRemoteReadOps);
            inputStream.unbuffer();
            inputStream.seek(seekPos -= 10L);
            this.assertNoIncrementInRemoteReadOps(currentRemoteReadOps, streamStatistics.getRemoteReadOperations());
            this.assertGetPosition(seekPos, inputStream.getPos());
        }
    }

    private void createDataSet(Path path) throws IOException {
        ContractTestUtils.createFile((FileSystem)this.getFileSystem(), (Path)path, (boolean)true, (byte[])BLOCK);
    }

    private void assertGetPosition(long expected, long actual) {
        String seekPosErrorMsg = "getPos() should return %s";
        ((AbstractLongAssert)Assertions.assertThat((long)actual).describedAs("getPos() should return %s", new Object[]{expected})).isEqualTo(actual);
    }

    private void assertDataAtPos(int pos, byte actualData) {
        String dataErrorMsg = "Mismatch in data@%s";
        ((AbstractByteAssert)Assertions.assertThat((byte)actualData).describedAs("Mismatch in data@%s", new Object[]{pos})).isEqualTo(BLOCK[pos]);
    }

    private void assertSeekBufferStats(long expected, long actual) {
        String statsErrorMsg = "Mismatch in seekInBuffer counts";
        ((AbstractLongAssert)Assertions.assertThat((long)actual).describedAs("Mismatch in seekInBuffer counts", new Object[0])).isEqualTo(expected);
    }

    private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) {
        String incrementErrorMsg = "Number of remote read ops shouldn't increase";
        ((AbstractLongAssert)Assertions.assertThat((long)newVal).describedAs("Number of remote read ops shouldn't increase", new Object[0])).isEqualTo(oldVal);
    }

    private void assertIncrementInRemoteReadOps(long oldVal, long newVal) {
        String incrementErrorMsg = "Number of remote read ops should increase";
        ((AbstractLongAssert)Assertions.assertThat((long)newVal).describedAs("Number of remote read ops should increase", new Object[0])).isGreaterThan(oldVal);
    }

    private void assertDatasetEquals(int readOffset, String operation, byte[] data, int length) {
        for (int i = 0; i < length; ++i) {
            int o = readOffset + i;
            ((AbstractByteAssert)Assertions.assertThat((byte)data[i]).describedAs(operation + "with read offset " + readOffset + ": data[" + i + "] != actualData[" + o + "]", new Object[0])).isEqualTo(BLOCK[o]);
        }
    }
}

