/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsAHCHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsJdkHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType;
import org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.http.HttpResponse;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class ITestAbfsOutputStream
extends AbstractAbfsIntegrationTest {
    private static final int TEST_EXECUTION_TIMEOUT = 120000;
    private static final String TEST_FILE_PATH = "testfile";
    private static final int TEN = 10;
    @Parameterized.Parameter
    public HttpOperationType httpOperationType;

    @Parameterized.Parameters(name="{0}")
    public static Iterable<Object[]> params() {
        return Arrays.asList({HttpOperationType.JDK_HTTP_URL_CONNECTION}, {HttpOperationType.APACHE_HTTP_CLIENT});
    }

    @Override
    public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception {
        Configuration conf = new Configuration(configuration);
        conf.set("fs.azure.networking.library", this.httpOperationType.toString());
        return (AzureBlobFileSystem)FileSystem.newInstance((Configuration)conf);
    }

    @Test
    public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
        Configuration conf = this.getRawConfiguration();
        AzureBlobFileSystem fs = this.getFileSystem(conf);
        try (FSDataOutputStream out = fs.create(this.path(TEST_FILE_PATH));){
            AbfsOutputStream stream = (AbfsOutputStream)out.getWrappedStream();
            int maxConcurrentRequests = this.getConfiguration().getWriteMaxConcurrentRequestCount();
            if (stream.isAppendBlobStream().booleanValue()) {
                maxConcurrentRequests = 1;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxConcurrentRequestCount()).describedAs("maxConcurrentRequests should be " + maxConcurrentRequests, new Object[0])).isEqualTo(maxConcurrentRequests);
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + this.getConfiguration().getMaxWriteRequestsToQueue(), new Object[0])).isEqualTo(this.getConfiguration().getMaxWriteRequestsToQueue());
        }
    }

    @Test
    public void testMaxRequestsAndQueueCapacity() throws Exception {
        Configuration conf = this.getRawConfiguration();
        int maxConcurrentRequests = 6;
        int maxRequestsToQueue = 10;
        conf.set("fs.azure.write.max.concurrent.requests", "" + maxConcurrentRequests);
        conf.set("fs.azure.write.max.requests.to.queue", "" + maxRequestsToQueue);
        AzureBlobFileSystem fs = this.getFileSystem(conf);
        try (FSDataOutputStream out = fs.create(this.path(TEST_FILE_PATH));){
            AbfsOutputStream stream = (AbfsOutputStream)out.getWrappedStream();
            if (stream.isAppendBlobStream().booleanValue()) {
                maxConcurrentRequests = 1;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxConcurrentRequestCount()).describedAs("maxConcurrentRequests should be " + maxConcurrentRequests, new Object[0])).isEqualTo(maxConcurrentRequests);
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + maxRequestsToQueue, new Object[0])).isEqualTo(maxRequestsToQueue);
        }
    }

    @Test(timeout=120000L)
    public void testAzureBlobFileSystemBackReferenceInOutputStream() throws Exception {
        byte[] testBytes = new byte[5120];
        try (AbfsOutputStream out = this.getStream();){
            for (int i = 0; i < 5; ++i) {
                out.write(testBytes);
                out.flush();
                System.gc();
                ((AbstractBooleanAssert)Assertions.assertThat((out.getExecutorService().isShutdown() || out.getExecutorService().isTerminated() ? 1 : 0) != 0).describedAs("Executor Service should not be closed before OutputStream while writing", new Object[0])).isFalse();
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)out.getFsBackRef().isNull()).describedAs("BackReference in output stream should not be null", new Object[0])).isFalse();
            }
        }
    }

    @Test
    public void testAbfsOutputStreamClosingFsBeforeStream() throws Exception {
        AzureBlobFileSystem fs = new AzureBlobFileSystem();
        fs.initialize(new URI(this.getTestUrl()), new Configuration());
        Path pathFs = this.path(this.getMethodName());
        byte[] inputBytes = new byte[5120];
        try (AbfsOutputStream out = this.createAbfsOutputStreamWithFlushEnabled(fs, pathFs);){
            out.write(inputBytes);
            fs.close();
            LambdaTestUtils.intercept(PathIOException.class, (String)this.getMethodName(), () -> ((AbfsOutputStream)out).close());
        }
    }

    @Test
    public void testExpect100ContinueFailureInAppend() throws Exception {
        if (!this.getIsNamespaceEnabled(this.getFileSystem())) {
            Assume.assumeFalse((String)"Not valid for APPEND BLOB", (boolean)this.isAppendBlobEnabled());
        }
        Configuration configuration = new Configuration(this.getRawConfiguration());
        configuration.set("fs.azure.account.expect.header.enabled", "true");
        AzureBlobFileSystem fs = this.getFileSystem(configuration);
        Path path = new Path("/testFile");
        AbfsOutputStream os = (AbfsOutputStream)Mockito.spy((Object)((AbfsOutputStream)fs.create(path).getWrappedStream()));
        AzureIngressHandler ingressHandler = (AzureIngressHandler)Mockito.spy((Object)os.getIngressHandler());
        ((AbfsOutputStream)Mockito.doReturn((Object)ingressHandler).when((Object)os)).getIngressHandler();
        AbfsClient spiedClient = (AbfsClient)Mockito.spy((Object)ingressHandler.getClient());
        ((AzureIngressHandler)Mockito.doReturn((Object)spiedClient).when((Object)ingressHandler)).getClient();
        AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
        this.mockSetupForAppend(httpOpForAppendTest, spiedClient);
        ((AbfsOutputStream)Mockito.doReturn((Object)spiedClient).when((Object)os)).getClient();
        fs.delete(path, true);
        os.write(1);
        if (spiedClient instanceof AbfsDfsClient) {
            LambdaTestUtils.intercept(FileNotFoundException.class, () -> ((AbfsOutputStream)os).close());
        } else {
            IOException ex = (IOException)LambdaTestUtils.intercept(IOException.class, () -> ((AbfsOutputStream)os).close());
            Assertions.assertThat((Throwable)ex.getCause().getCause()).isInstanceOf(AbfsRestOperationException.class);
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)httpOpForAppendTest[0].getConnectionDisconnectedOnError()).describedAs("First try from AbfsClient will have expect-100 header and should fail with expect-100 error.", new Object[0])).isTrue();
        if (httpOpForAppendTest[0] instanceof AbfsJdkHttpOperation) {
            ((AbfsJdkHttpOperation)Mockito.verify((Object)((AbfsJdkHttpOperation)httpOpForAppendTest[0]), (VerificationMode)Mockito.times((int)0))).processConnHeadersAndInputStreams((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)httpOpForAppendTest[1].getConnectionDisconnectedOnError()).describedAs("The retried operation from AbfsClient should not fail with expect-100 error. The retried operation does not haveexpect-100 header.", new Object[0])).isFalse();
        if (httpOpForAppendTest[1] instanceof AbfsJdkHttpOperation) {
            ((AbfsJdkHttpOperation)Mockito.verify((Object)((AbfsJdkHttpOperation)httpOpForAppendTest[1]), (VerificationMode)Mockito.times((int)1))).processConnHeadersAndInputStreams((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
        }
    }

    private void mockSetupForAppend(AbfsHttpOperation[] httpOpForAppendTest, AbfsClient spiedClient) {
        int[] index = new int[]{0};
        ((AbfsClient)Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
            AbfsRestOperation op = (AbfsRestOperation)Mockito.spy((Object)((AbfsRestOperation)abfsRestOpAppendGetInvocation.callRealMethod()));
            boolean[] isExpectCall = new boolean[1];
            for (AbfsHttpHeader header : op.getRequestHeaders()) {
                if (!header.getName().equals("Expect")) continue;
                isExpectCall[0] = true;
            }
            ((AbfsRestOperation)Mockito.doAnswer(createHttpOpInvocation -> {
                httpOpForAppendTest[index[0]] = (AbfsHttpOperation)Mockito.spy((Object)((AbfsHttpOperation)createHttpOpInvocation.callRealMethod()));
                if (isExpectCall[0]) {
                    if (httpOpForAppendTest[index[0]] instanceof AbfsJdkHttpOperation) {
                        ((AbfsJdkHttpOperation)Mockito.doAnswer(invocation -> {
                            OutputStream os = (OutputStream)invocation.callRealMethod();
                            os.write(1);
                            os.close();
                            throw new ProtocolException("Server rejected operation");
                        }).when((Object)((AbfsJdkHttpOperation)httpOpForAppendTest[index[0]]))).getConnOutputStream();
                    } else {
                        ((AbfsAHCHttpOperation)Mockito.doAnswer(invocation -> {
                            throw new AbfsApacheHttpExpect100Exception((HttpResponse)invocation.callRealMethod());
                        }).when((Object)((AbfsAHCHttpOperation)httpOpForAppendTest[index[0]]))).executeRequest();
                    }
                }
                int n = index[0];
                index[0] = n + 1;
                return httpOpForAppendTest[n];
            }).when((Object)op)).createHttpOperation();
            return op;
        }).when((Object)spiedClient)).getAbfsRestOperation((AbfsRestOperationType)Mockito.any(AbfsRestOperationType.class), Mockito.anyString(), (URL)Mockito.any(URL.class), Mockito.anyList(), (byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt(), (String)Mockito.nullable(String.class));
    }

    private AbfsOutputStream getStream() throws URISyntaxException, IOException {
        AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
        fs1.initialize(new URI(this.getTestUrl()), new Configuration());
        Path pathFs1 = this.path(this.getMethodName() + "1");
        return this.createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
    }

    @Test
    public void testValidateGetBlockList() throws Exception {
        AzureBlobFileSystem fs = (AzureBlobFileSystem)Mockito.spy((Object)this.getFileSystem());
        Assume.assumeTrue((!this.getIsNamespaceEnabled(fs) ? 1 : 0) != 0);
        AzureBlobFileSystemStore store = (AzureBlobFileSystemStore)Mockito.spy((Object)fs.getAbfsStore());
        this.assumeBlobServiceType();
        AbfsClientHandler clientHandler = (AbfsClientHandler)Mockito.spy((Object)store.getClientHandler());
        AbfsBlobClient blobClient = (AbfsBlobClient)Mockito.spy((Object)clientHandler.getBlobClient());
        ((AzureBlobFileSystemStore)Mockito.doReturn((Object)clientHandler).when((Object)store)).getClientHandler();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getBlobClient();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getIngressClient();
        ((AzureBlobFileSystem)Mockito.doReturn((Object)store).when((Object)fs)).getAbfsStore();
        Path testFilePath = new Path("/testFile");
        AbfsOutputStream os = (AbfsOutputStream)Mockito.spy((Object)((AbfsOutputStream)fs.create(testFilePath).getWrappedStream()));
        ((AbfsOutputStream)Mockito.doReturn((Object)clientHandler).when((Object)os)).getClientHandler();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getBlobClient();
        AbfsRestOperationException exception = this.getMockAbfsRestOperationException(409);
        ((AbfsBlobClient)Mockito.doThrow((Throwable[])new Throwable[]{exception}).when((Object)blobClient)).getBlockList(Mockito.anyString(), (TracingContext)Mockito.any(TracingContext.class));
        os.write(10);
        os.hsync();
        os.close();
        ((AzureBlobFileSystemStore)Mockito.doCallRealMethod().when((Object)store)).openFileForWrite((Path)Mockito.any(Path.class), (FileSystem.Statistics)Mockito.any(), Mockito.anyBoolean(), (TracingContext)Mockito.any(TracingContext.class));
        LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> store.openFileForWrite(testFilePath, null, false, this.getTestTracingContext(fs, true)));
    }

    @Test
    public void testNoNetworkCallsForFlush() throws Exception {
        AzureBlobFileSystem fs = (AzureBlobFileSystem)Mockito.spy((Object)this.getFileSystem());
        Assume.assumeTrue((!this.getIsNamespaceEnabled(fs) ? 1 : 0) != 0);
        AzureBlobFileSystemStore store = (AzureBlobFileSystemStore)Mockito.spy((Object)fs.getAbfsStore());
        this.assumeBlobServiceType();
        AbfsClientHandler clientHandler = (AbfsClientHandler)Mockito.spy((Object)store.getClientHandler());
        AbfsBlobClient blobClient = (AbfsBlobClient)Mockito.spy((Object)clientHandler.getBlobClient());
        ((AzureBlobFileSystemStore)Mockito.doReturn((Object)clientHandler).when((Object)store)).getClientHandler();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getBlobClient();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getIngressClient();
        ((AzureBlobFileSystem)Mockito.doReturn((Object)store).when((Object)fs)).getAbfsStore();
        Path testFilePath = new Path("/testFile");
        AbfsOutputStream os = (AbfsOutputStream)Mockito.spy((Object)((AbfsOutputStream)fs.create(testFilePath).getWrappedStream()));
        AzureIngressHandler ingressHandler = (AzureIngressHandler)Mockito.spy((Object)os.getIngressHandler());
        ((AbfsOutputStream)Mockito.doReturn((Object)ingressHandler).when((Object)os)).getIngressHandler();
        ((AzureIngressHandler)Mockito.doReturn((Object)blobClient).when((Object)ingressHandler)).getClient();
        ((AbfsOutputStream)Mockito.doReturn((Object)clientHandler).when((Object)os)).getClientHandler();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getBlobClient();
        os.hsync();
        ((AbfsBlobClient)Mockito.verify((Object)blobClient, (VerificationMode)Mockito.times((int)0))).append((String)Mockito.any(), (byte[])Mockito.any(), (AppendRequestParameters)Mockito.any(), (String)Mockito.any(), (ContextEncryptionAdapter)Mockito.any(), (TracingContext)Mockito.any(TracingContext.class));
        ((AbfsBlobClient)Mockito.verify((Object)blobClient, (VerificationMode)Mockito.times((int)0))).flush((byte[])Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), (ContextEncryptionAdapter)Mockito.any(), (TracingContext)Mockito.any(TracingContext.class));
    }

    private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
        return new AbfsRestOperationException(status, "", "", new Exception());
    }

    @Test
    public void testNoNetworkCallsForSecondFlush() throws Exception {
        AzureBlobFileSystem fs = (AzureBlobFileSystem)Mockito.spy((Object)this.getFileSystem());
        Assume.assumeTrue((!this.getIsNamespaceEnabled(fs) ? 1 : 0) != 0);
        AzureBlobFileSystemStore store = (AzureBlobFileSystemStore)Mockito.spy((Object)fs.getAbfsStore());
        this.assumeBlobServiceType();
        Assume.assumeFalse((String)"Not valid for APPEND BLOB", (boolean)this.isAppendBlobEnabled());
        AbfsClientHandler clientHandler = (AbfsClientHandler)Mockito.spy((Object)store.getClientHandler());
        AbfsBlobClient blobClient = (AbfsBlobClient)Mockito.spy((Object)clientHandler.getBlobClient());
        ((AzureBlobFileSystemStore)Mockito.doReturn((Object)clientHandler).when((Object)store)).getClientHandler();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getBlobClient();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getIngressClient();
        ((AzureBlobFileSystem)Mockito.doReturn((Object)store).when((Object)fs)).getAbfsStore();
        Path testFilePath = new Path("/testFile");
        AbfsOutputStream os = (AbfsOutputStream)Mockito.spy((Object)((AbfsOutputStream)fs.create(testFilePath).getWrappedStream()));
        AzureIngressHandler ingressHandler = (AzureIngressHandler)Mockito.spy((Object)os.getIngressHandler());
        ((AbfsOutputStream)Mockito.doReturn((Object)ingressHandler).when((Object)os)).getIngressHandler();
        ((AzureIngressHandler)Mockito.doReturn((Object)blobClient).when((Object)ingressHandler)).getClient();
        ((AbfsOutputStream)Mockito.doReturn((Object)clientHandler).when((Object)os)).getClientHandler();
        ((AbfsClientHandler)Mockito.doReturn((Object)blobClient).when((Object)clientHandler)).getBlobClient();
        os.write(10);
        os.hsync();
        os.close();
        ((AbfsBlobClient)Mockito.verify((Object)blobClient, (VerificationMode)Mockito.times((int)1))).append(Mockito.anyString(), (byte[])Mockito.any(byte[].class), (AppendRequestParameters)Mockito.any(AppendRequestParameters.class), (String)Mockito.any(), (ContextEncryptionAdapter)Mockito.any(), (TracingContext)Mockito.any(TracingContext.class));
        ((AbfsBlobClient)Mockito.verify((Object)blobClient, (VerificationMode)Mockito.times((int)1))).flush((byte[])Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), (String)Mockito.any(), (String)Mockito.any(), Mockito.anyString(), (ContextEncryptionAdapter)Mockito.any(), (TracingContext)Mockito.any(TracingContext.class));
    }
}

