/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class ITestAzureBlobFileSystemLease
extends AbstractAbfsIntegrationTest {
    private static final int TEST_EXECUTION_TIMEOUT = 30000;
    private static final int LONG_TEST_EXECUTION_TIMEOUT = 90000;
    private static final String TEST_FILE = "testfile";
    private final boolean isHNSEnabled = this.getConfiguration().getBoolean("fs.azure.test.namespace.enabled", false);
    private static final int TEST_BYTES = 20;
    private static final String PARALLEL_ACCESS = "Parallel access to the create path detected";

    private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception {
        Configuration conf = this.getRawConfiguration();
        conf.setBoolean(String.format("fs.%s.impl.disable.cache", this.getAbfsScheme()), true);
        conf.set("fs.azure.infinite-lease.directories", infiniteLeaseDirs.toUri().getPath());
        conf.setInt("fs.azure.lease.threads", numLeaseThreads);
        return this.getFileSystem(conf);
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testNoInfiniteLease() throws IOException {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getFileSystem();
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            Assertions.assertFalse((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should not have lease");
        }
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testNoLeaseThreads() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 0);
        fs.mkdirs(testFilePath.getParent());
        LambdaTestUtils.intercept(IOException.class, (String)"Lease desired but no lease threads configured, set fs.azure.lease.threads", () -> {
            FSDataOutputStream out = fs.create(testFilePath);
            if (out != null) {
                out.close();
            }
            return "No failure when lease requested with 0 lease threads";
        });
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testOneWriter() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should have lease");
        out.close();
        Assertions.assertFalse((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should not have lease");
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testSubDir() throws Exception {
        Path testFilePath = new Path(new Path(this.path(this.methodName.getMethodName()), "subdir"), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent().getParent(), 1);
        fs.mkdirs(testFilePath.getParent().getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should have lease");
        out.close();
        Assertions.assertFalse((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should not have lease");
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testTwoCreate() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        AbfsClient client = fs.getAbfsStore().getClientHandler().getIngressClient();
        this.assumeValidTestConfigPresent(this.getRawConfiguration(), "fs.azure.test.namespace.enabled");
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            LambdaTestUtils.intercept(IOException.class, (String)(this.isHNSEnabled && this.getIngressServiceType() == AbfsServiceType.DFS ? PARALLEL_ACCESS : (client instanceof AbfsBlobClient ? "There is currently a lease on the blob and no lease ID was specified in the request" : "There is currently a lease on the resource and no lease ID was specified in the request")), () -> {
                FSDataOutputStream out2 = fs.create(testFilePath);
                if (out2 != null) {
                    out2.close();
                }
                return "Expected second create on infinite lease dir to fail";
            });
        }
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception {
        block22: {
            AbfsClient client = fs.getAbfsStore().getClientHandler().getIngressClient();
            try (FSDataOutputStream out = fs.create(testFilePath);){
                try (FSDataOutputStream out2 = fs.append(testFilePath);){
                    out2.writeInt(2);
                    out2.hsync();
                }
                catch (IOException e) {
                    if (expectException) {
                        GenericTestUtils.assertExceptionContains((String)"Unable to acquire lease", (Throwable)e);
                    }
                    throw e;
                }
                out.writeInt(1);
                try {
                    out.hsync();
                }
                catch (IOException e) {
                    if (client instanceof AbfsBlobClient) {
                        GenericTestUtils.assertExceptionContains((String)"The condition specified using HTTP conditional header(s) is not met.", (Throwable)e);
                        break block22;
                    }
                    throw e;
                }
            }
            catch (IOException e) {
                if (client instanceof AbfsBlobClient) {
                    GenericTestUtils.assertExceptionContains((String)"The condition specified using HTTP conditional header(s) is not met.", (Throwable)e);
                }
                throw e;
            }
        }
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getFileSystem();
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)this.isAppendBlobEnabled()).as("Parallel Writes Not Allowed on Append Blobs", new Object[0])).isFalse();
        fs.mkdirs(testFilePath.getParent());
        this.twoWriters(fs, testFilePath, false);
    }

    @Test
    @Timeout(value=90000L, unit=TimeUnit.MILLISECONDS)
    public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)this.isAppendBlobEnabled()).as("Parallel Writes Not Allowed on Append Blobs", new Object[0])).isFalse();
        fs.mkdirs(testFilePath.getParent());
        this.twoWriters(fs, testFilePath, true);
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testLeaseFreedOnClose() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        out.write(0);
        Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should have lease");
        out.close();
        Assertions.assertFalse((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should not have lease after close");
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testWriteAfterBreakLease() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        AbfsClient client = fs.getAbfsStore().getClientHandler().getIngressClient();
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        out.write(0);
        out.hsync();
        fs.registerListener((Listener)new TracingHeaderValidator(this.getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.BREAK_LEASE, false, 0));
        fs.breakLease(testFilePath);
        fs.registerListener(null);
        LambdaTestUtils.intercept(IOException.class, (String)(client instanceof AbfsBlobClient ? "A lease ID was specified, but the lease for the blob has expired." : "A lease ID was specified, but the lease for the resource has expired."), () -> {
            out.write(1);
            out.hsync();
            return "Expected exception on write after lease break but got " + out;
        });
        LambdaTestUtils.intercept(IOException.class, (String)(client instanceof AbfsBlobClient ? "A lease ID was specified, but the lease for the blob has expired." : "A lease ID was specified, but the lease for the resource has expired."), () -> {
            if (this.isAppendBlobEnabled() && this.getIngressServiceType() == AbfsServiceType.BLOB) {
                out.write(20);
            }
            out.close();
            return "Expected exception on close after lease break but got " + out;
        });
        Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).isLeaseFreed(), (String)"Output stream lease should be freed");
        try (FSDataOutputStream out2 = fs.append(testFilePath);){
            out2.write(2);
            out2.hsync();
        }
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=90000L, unit=TimeUnit.MILLISECONDS)
    public void testLeaseFreedAfterBreak() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        AbfsClient client = fs.getAbfsStore().getClientHandler().getIngressClient();
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        out.write(0);
        fs.breakLease(testFilePath);
        LambdaTestUtils.intercept(IOException.class, (String)(client instanceof AbfsBlobClient ? "A lease ID was specified, but the lease for the blob has expired." : "A lease ID was specified, but the lease for the resource has expired."), () -> {
            out.close();
            return "Expected exception on close after lease break but got " + out;
        });
        Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).isLeaseFreed(), (String)"Output stream lease should be freed");
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testInfiniteLease() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should have lease");
            out.write(0);
        }
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed());
        out = fs.append(testFilePath);
        try {
            Assertions.assertTrue((boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease(), (String)"Output stream should have lease");
            out.write(1);
        }
        finally {
            if (out != null) {
                out.close();
            }
        }
        Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testFileSystemClose() throws Exception {
        block9: {
            Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
            AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
            fs.mkdirs(testFilePath.getParent());
            try (FSDataOutputStream out = fs.create(testFilePath);){
                out.write(0);
                Assertions.assertFalse((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases should exist");
            }
            fs.close();
            Assertions.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed(), (String)"Store leases were not freed");
            Callable<String> exceptionRaisingCallable = () -> {
                FSDataOutputStream out2 = fs.append(testFilePath);
                if (out2 != null) {
                    out2.close();
                }
                return "Expected exception on new append after closed FS";
            };
            try {
                exceptionRaisingCallable.call();
                ITestAzureBlobFileSystemLease.fail((String)"Expected exception was not thrown");
            }
            catch (Exception e) {
                if (e instanceof AbfsDriverException || e instanceof RejectedExecutionException) break block9;
                ITestAzureBlobFileSystemLease.fail((String)("Unexpected exception type: " + e.getClass()));
            }
        }
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    public void testAcquireRetry() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        fs.createNewFile(testFilePath);
        TracingContext tracingContext = this.getTestTracingContext(fs, true);
        TracingHeaderValidator listener = new TracingHeaderValidator(this.getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.TEST_OP, true, 0);
        tracingContext.setListener((Listener)listener);
        AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath(), true, -1L, null, tracingContext);
        Assertions.assertNotNull((Object)lease.getLeaseID(), (String)"Did not successfully lease file");
        listener.setOperation(FSOperationType.RELEASE_LEASE);
        lease.free();
        lease.getTracingContext().setListener(null);
        Assertions.assertEquals((int)0, (int)lease.getAcquireRetryCount(), (String)"Unexpected acquire retry count");
        AbfsClient mockClient = (AbfsClient)Mockito.spy((Object)fs.getAbfsClient());
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new AbfsLease.LeaseException("failed to acquire 1")}).doThrow(new Throwable[]{new AbfsLease.LeaseException("failed to acquire 2")}).doCallRealMethod().when((Object)mockClient)).acquireLease(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), true, 5, 1, -1L, null, tracingContext);
        Assertions.assertNotNull((Object)lease.getLeaseID(), (String)"Acquire lease should have retried");
        lease.free();
        Assertions.assertEquals((int)2, (int)lease.getAcquireRetryCount(), (String)"Unexpected acquire retry count");
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new AbfsLease.LeaseException("failed to acquire")}).when((Object)mockClient)).acquireLease(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> new AbfsLease(mockClient, testFilePath.toUri().getPath(), true, 5, 1, -1L, null, tracingContext));
    }
}

