/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSPacket;
import org.apache.hadoop.hdfs.DataStreamer;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.DataChecksum;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestDFSOutputStream {
    static MiniDFSCluster cluster;

    @BeforeAll
    public static void setup() throws IOException {
        Configuration conf = new Configuration();
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
    }

    @Test
    public void testCloseTwice() throws IOException {
        DistributedFileSystem fs = cluster.getFileSystem();
        FSDataOutputStream os = fs.create(new Path("/test"));
        DFSOutputStream dos = (DFSOutputStream)Whitebox.getInternalState((Object)os, (String)"wrappedStream");
        DataStreamer streamer = (DataStreamer)Whitebox.getInternalState((Object)dos, (String)"streamer");
        DataStreamer.LastExceptionInStreamer ex = (DataStreamer.LastExceptionInStreamer)Whitebox.getInternalState((Object)streamer, (String)"lastException");
        Throwable thrown = (Throwable)Whitebox.getInternalState((Object)ex, (String)"thrown");
        Assertions.assertNull((Object)thrown);
        dos.close();
        IOException dummy = new IOException("dummy");
        ex.set((Throwable)dummy);
        try {
            dos.close();
        }
        catch (IOException e) {
            Assertions.assertEquals((Object)e, (Object)dummy);
        }
        thrown = (Throwable)Whitebox.getInternalState((Object)ex, (String)"thrown");
        Assertions.assertNull((Object)thrown);
        dos.close();
    }

    @Test
    public void testComputePacketChunkSize() throws Exception {
        DistributedFileSystem fs = cluster.getFileSystem();
        FSDataOutputStream os = fs.create(new Path("/test"));
        DFSOutputStream dos = (DFSOutputStream)Whitebox.getInternalState((Object)os, (String)"wrappedStream");
        int packetSize = 65536;
        int bytesPerChecksum = 512;
        Method method = dos.getClass().getDeclaredMethod("computePacketChunkSize", Integer.TYPE, Integer.TYPE);
        method.setAccessible(true);
        method.invoke((Object)dos, 65536, 512);
        Field field = dos.getClass().getDeclaredField("packetSize");
        field.setAccessible(true);
        Assertions.assertTrue(((Integer)field.get(dos) + 33 < 65536 ? 1 : 0) != 0);
        Assertions.assertTrue(((Integer)field.get(dos) + 257 < 65536 ? 1 : 0) != 0);
    }

    @Test
    @Timeout(value=60L)
    public void testPreventOverflow() throws IOException, NoSuchFieldException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException {
        int defaultWritePacketSize = 65536;
        int configuredWritePacketSize = 65536;
        int finalWritePacketSize = 65536;
        this.runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
        configuredWritePacketSize = 1048576000;
        finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
        this.runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runAdjustChunkBoundary(int configuredWritePacketSize, int finalWritePacketSize) throws IOException, NoSuchFieldException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException {
        boolean appendChunk = false;
        long blockSize = 0xC000001CL;
        long bytesCurBlock = 0x40000000L;
        int bytesPerChecksum = 512;
        int checksumSize = 4;
        int chunkSize = 516;
        int packateMaxHeaderLength = 33;
        MiniDFSCluster dfsCluster = null;
        File baseDir = new File(PathUtils.getTestDir(this.getClass()), GenericTestUtils.getMethodName());
        try {
            Configuration dfsConf = new Configuration();
            dfsConf.set("hdfs.minidfs.basedir", baseDir.getAbsolutePath());
            dfsConf.setInt("dfs.client-write-packet-size", configuredWritePacketSize);
            dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
            dfsCluster.waitActive();
            FSDataOutputStream os = dfsCluster.getFileSystem().create(new Path(baseDir.getPath(), "testPreventOverflow"));
            DFSOutputStream dos = (DFSOutputStream)Whitebox.getInternalState((Object)os, (String)"wrappedStream");
            Method setAppendChunkMethod = dos.getClass().getDeclaredMethod("setAppendChunk", Boolean.TYPE);
            setAppendChunkMethod.setAccessible(true);
            setAppendChunkMethod.invoke((Object)dos, false);
            Method setBytesCurBlockMethod = dos.getClass().getDeclaredMethod("setBytesCurBlock", Long.TYPE);
            setBytesCurBlockMethod.setAccessible(true);
            setBytesCurBlockMethod.invoke((Object)dos, 0x40000000L);
            Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
            blockSizeField.setAccessible(true);
            blockSizeField.setLong(dos, 0xC000001CL);
            Method method = dos.getClass().getDeclaredMethod("adjustChunkBoundary", new Class[0]);
            method.setAccessible(true);
            method.invoke((Object)dos, new Object[0]);
            Field writePacketSizeField = dos.getClass().getDeclaredField("writePacketSize");
            writePacketSizeField.setAccessible(true);
            Assertions.assertEquals((int)writePacketSizeField.getInt(dos), (int)finalWritePacketSize);
            Field chunksPerPacketField = dos.getClass().getDeclaredField("chunksPerPacket");
            chunksPerPacketField.setAccessible(true);
            Assertions.assertEquals((int)chunksPerPacketField.getInt(dos), (int)((finalWritePacketSize - 33) / 516));
            Field packetSizeField = dos.getClass().getDeclaredField("packetSize");
            packetSizeField.setAccessible(true);
            Assertions.assertEquals((int)packetSizeField.getInt(dos), (int)(chunksPerPacketField.getInt(dos) * 516));
        }
        finally {
            if (dfsCluster != null) {
                dfsCluster.shutdown();
            }
        }
    }

    @Test
    public void testCongestionBackoff() throws IOException {
        DfsClientConf dfsClientConf = (DfsClientConf)Mockito.mock(DfsClientConf.class);
        DFSClient client = (DFSClient)Mockito.mock(DFSClient.class);
        Configuration conf = (Configuration)Mockito.mock(Configuration.class);
        Mockito.when((Object)client.getConfiguration()).thenReturn((Object)conf);
        Mockito.when((Object)client.getConf()).thenReturn((Object)dfsClientConf);
        Mockito.when((Object)client.getTracer()).thenReturn((Object)FsTracer.get((Configuration)new Configuration()));
        client.clientRunning = true;
        DataStreamer stream = new DataStreamer((HdfsFileStatus)Mockito.mock(HdfsFileStatus.class), (ExtendedBlock)Mockito.mock(ExtendedBlock.class), client, "foo", null, null, null, null, null, null);
        DataOutputStream blockStream = (DataOutputStream)Mockito.mock(DataOutputStream.class);
        ((DataOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException()}).when((Object)blockStream)).flush();
        Whitebox.setInternalState((Object)stream, (String)"blockStream", (Object)blockStream);
        Whitebox.setInternalState((Object)stream, (String)"stage", (Object)BlockConstructionStage.PIPELINE_CLOSE);
        LinkedList dataQueue = (LinkedList)Whitebox.getInternalState((Object)stream, (String)"dataQueue");
        ArrayList congestedNodes = (ArrayList)Whitebox.getInternalState((Object)stream, (String)"congestedNodes");
        congestedNodes.add(Mockito.mock(DatanodeInfo.class));
        DFSPacket packet = (DFSPacket)Mockito.mock(DFSPacket.class);
        dataQueue.add(packet);
        stream.run();
        Assertions.assertTrue((boolean)congestedNodes.isEmpty());
    }

    @Test
    @Timeout(value=60L)
    public void testCongestionAckDelay() {
        DfsClientConf dfsClientConf = (DfsClientConf)Mockito.mock(DfsClientConf.class);
        DFSClient client = (DFSClient)Mockito.mock(DFSClient.class);
        Configuration conf = (Configuration)Mockito.mock(Configuration.class);
        Mockito.when((Object)client.getConfiguration()).thenReturn((Object)conf);
        Mockito.when((Object)client.getConf()).thenReturn((Object)dfsClientConf);
        Mockito.when((Object)client.getTracer()).thenReturn((Object)FsTracer.get((Configuration)new Configuration()));
        client.clientRunning = true;
        DataStreamer stream = new DataStreamer((HdfsFileStatus)Mockito.mock(HdfsFileStatus.class), (ExtendedBlock)Mockito.mock(ExtendedBlock.class), client, "foo", null, null, null, null, null, null);
        DataOutputStream blockStream = (DataOutputStream)Mockito.mock(DataOutputStream.class);
        Whitebox.setInternalState((Object)stream, (String)"blockStream", (Object)blockStream);
        Whitebox.setInternalState((Object)stream, (String)"stage", (Object)BlockConstructionStage.PIPELINE_CLOSE);
        LinkedList dataQueue = (LinkedList)Whitebox.getInternalState((Object)stream, (String)"dataQueue");
        ArrayList congestedNodes = (ArrayList)Whitebox.getInternalState((Object)stream, (String)"congestedNodes");
        int backOffMaxTime = (Integer)Whitebox.getInternalState((Object)stream, (String)"congestionBackOffMaxTimeInMs");
        DFSPacket[] packet = new DFSPacket[100];
        AtomicBoolean isDelay = new AtomicBoolean(true);
        new Thread(() -> {
            for (int i = 0; i < 10; ++i) {
                try {
                    Thread.sleep(backOffMaxTime / 50);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                LinkedList linkedList = dataQueue;
                synchronized (linkedList) {
                    congestedNodes.add(Mockito.mock(DatanodeInfo.class));
                    if (congestedNodes.size() > 1) {
                        isDelay.set(false);
                        try {
                            ((DataOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException()}).when((Object)blockStream)).flush();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    continue;
                }
            }
            try {
                ((DataOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException()}).when((Object)blockStream)).flush();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            DFSPacket endPacket = (DFSPacket)Mockito.mock(DFSPacket.class);
            dataQueue.add(endPacket);
        }).start();
        new Thread(() -> {
            for (int i = 0; i < 100; ++i) {
                packet[i] = (DFSPacket)Mockito.mock(DFSPacket.class);
                dataQueue.add(packet[i]);
                try {
                    Thread.sleep(backOffMaxTime / 100);
                    continue;
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        stream.run();
        Assertions.assertFalse((boolean)isDelay.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoLocalWriteFlag() throws IOException {
        DistributedFileSystem fs = cluster.getFileSystem();
        EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE, CreateFlag.CREATE);
        BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
        DatanodeManager dm = bm.getDatanodeManager();
        try (FSDataOutputStream os = fs.create(new Path("/test-no-local"), FsPermission.getDefault(), flags, 512, (short)2, 512L, null);){
            DatanodeManager spyDm = (DatanodeManager)Mockito.spy((Object)dm);
            DatanodeDescriptor dn1 = (DatanodeDescriptor)dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.LIVE).get(0);
            ((DatanodeManager)Mockito.doReturn((Object)dn1).when((Object)spyDm)).getDatanodeByHost("127.0.0.1");
            Whitebox.setInternalState((Object)bm, (String)"datanodeManager", (Object)spyDm);
            byte[] buf = new byte[8192];
            new Random().nextBytes(buf);
            os.write(buf);
        }
        finally {
            Whitebox.setInternalState((Object)bm, (String)"datanodeManager", (Object)dm);
        }
        cluster.triggerBlockReports();
        String bpid = cluster.getNamesystem().getBlockPoolId();
        Assertions.assertEquals((int)3, (int)cluster.getAllBlockReports(bpid).size());
        int numDataNodesWithData = 0;
        block12: for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks : cluster.getAllBlockReports(bpid)) {
            for (BlockListAsLongs blocks : dnBlocks.values()) {
                if (blocks.getNumberOfBlocks() <= 0) continue;
                ++numDataNodesWithData;
                continue block12;
            }
        }
        Assertions.assertEquals((int)1, (int)(3 - numDataNodesWithData));
    }

    @Test
    public void testEndLeaseCall() throws Exception {
        Configuration conf = new Configuration();
        DFSClient client = new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
        DFSClient spyClient = (DFSClient)Mockito.spy((Object)client);
        DFSOutputStream dfsOutputStream = spyClient.create("/file2", FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), (short)3, 1024L, null, 1024, null);
        DFSOutputStream spyDFSOutputStream = (DFSOutputStream)Mockito.spy((Object)dfsOutputStream);
        spyDFSOutputStream.closeThreads(ArgumentMatchers.anyBoolean());
        ((DFSClient)Mockito.verify((Object)spyClient, (VerificationMode)Mockito.times((int)1))).endFileLease(ArgumentMatchers.anyString());
    }

    @Test
    public void testStreamFlush() throws Exception {
        DistributedFileSystem fs = cluster.getFileSystem();
        FSDataOutputStream os = fs.create(new Path("/normal-file"));
        Assertions.assertTrue((boolean)os.hasCapability(StreamCapabilities.StreamCapability.HFLUSH.getValue()), (String)"DFSOutputStream should support hflush()!");
        Assertions.assertTrue((boolean)os.hasCapability(StreamCapabilities.StreamCapability.HSYNC.getValue()), (String)"DFSOutputStream should support hsync()!");
        byte[] bytes = new byte[1024];
        ByteArrayInputStream is = new ByteArrayInputStream(bytes);
        IOUtils.copyBytes((InputStream)is, (OutputStream)os, (int)bytes.length);
        os.hflush();
        IOUtils.copyBytes((InputStream)is, (OutputStream)os, (int)bytes.length);
        os.hsync();
        os.close();
    }

    @Test
    public void testExceptionInCloseWithRecoverLease() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.client.write.recover.lease.on.close.exception", true);
        DFSClient client = new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
        DFSClient spyClient = (DFSClient)Mockito.spy((Object)client);
        DFSOutputStream dfsOutputStream = spyClient.create("/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), (short)3, 1024L, null, 1024, null);
        DFSOutputStream spyDFSOutputStream = (DFSOutputStream)Mockito.spy((Object)dfsOutputStream);
        ((DFSOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Emulated IOException in close")}).when((Object)spyDFSOutputStream)).completeFile();
        try {
            spyDFSOutputStream.close();
            Assertions.fail();
        }
        catch (IOException ioe) {
            Assertions.assertTrue((boolean)spyDFSOutputStream.isLeaseRecovered());
            this.waitForFileClosed("/testExceptionInCloseWithRecoverLease");
            Assertions.assertTrue((boolean)this.isFileClosed("/testExceptionInCloseWithRecoverLease"));
        }
    }

    @Test
    public void testExceptionInCloseWithoutRecoverLease() throws Exception {
        Configuration conf = new Configuration();
        DFSClient client = new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
        DFSClient spyClient = (DFSClient)Mockito.spy((Object)client);
        DFSOutputStream dfsOutputStream = spyClient.create("/testExceptionInCloseWithoutRecoverLease", FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), (short)3, 1024L, null, 1024, null);
        DFSOutputStream spyDFSOutputStream = (DFSOutputStream)Mockito.spy((Object)dfsOutputStream);
        ((DFSOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Emulated IOException in close")}).when((Object)spyDFSOutputStream)).completeFile();
        try {
            spyDFSOutputStream.close();
            Assertions.fail();
        }
        catch (IOException ioe) {
            Assertions.assertFalse((boolean)spyDFSOutputStream.isLeaseRecovered());
            try {
                this.waitForFileClosed("/testExceptionInCloseWithoutRecoverLease");
            }
            catch (TimeoutException e) {
                Assertions.assertFalse((boolean)this.isFileClosed("/testExceptionInCloseWithoutRecoverLease"));
            }
        }
    }

    @Test
    @Timeout(value=60L)
    public void testFirstPacketSizeInNewBlocks() throws IOException {
        long blockSize = 0x100000L;
        MiniDFSCluster dfsCluster = cluster;
        DistributedFileSystem fs = dfsCluster.getFileSystem();
        Configuration dfsConf = fs.getConf();
        EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE);
        try (FSDataOutputStream fos = fs.create(new Path("/testfile.dat"), FsPermission.getDefault(), flags, 512, (short)3, 0x100000L, null);){
            DataChecksum crc32c = DataChecksum.newDataChecksum((DataChecksum.Type)DataChecksum.Type.CRC32C, (int)512);
            Random r = new Random();
            byte[] buf = new byte[0x100000];
            r.nextBytes(buf);
            fos.write(buf);
            fos.hflush();
            int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize();
            int packetContentSize = (dfsConf.getInt("dfs.client-write-packet-size", 65536) - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize;
            for (long loop = 0L; loop < 20L; ++loop) {
                r.nextBytes(buf);
                fos.write(buf);
                fos.hflush();
                Assertions.assertEquals((int)((DFSOutputStream)fos.getWrappedStream()).packetSize, (int)packetContentSize);
            }
        }
        fs.delete(new Path("/testfile.dat"), true);
    }

    @AfterAll
    public static void tearDown() {
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private boolean isFileClosed(String path) throws IOException {
        return cluster.getFileSystem().isFileClosed(new Path(path));
    }

    private void waitForFileClosed(String path) throws Exception {
        GenericTestUtils.waitFor(() -> {
            boolean closed;
            try {
                closed = this.isFileClosed(path);
            }
            catch (IOException e) {
                return false;
            }
            return closed;
        }, (long)1000L, (long)5000L);
    }
}

