package org.apache.hadoop.metrics2.sink;

import java.io.IOException;
import java.net.URI;
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.class */
public class TestRollingFileSystemSinkWithHdfs extends RollingFileSystemSinkTestBase {
    private static final int NUM_DATANODES = 4;
    private MiniDFSCluster cluster;

    @Before
    public void setupHdfs() throws IOException {
        this.cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(4).build();
        RollingFileSystemSink.hasFlushed = false;
    }

    @After
    public void shutdownHdfs() {
        if (this.cluster != null) {
            this.cluster.shutdown();
        }
    }

    @Test
    public void testWrite() throws Exception {
        String str = "hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp";
        assertMetricsContents(doWriteTest(initMetricsSystem(str, false, true), str, 1));
    }

    @Test
    public void testAppend() throws Exception {
        assertExtraContents(doAppendTest("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", false, true, 1));
    }

    @Test
    public void testSilentAppend() throws Exception {
        assertExtraContents(doAppendTest("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", true, true, 1));
    }

    @Test
    public void testNoAppend() throws Exception {
        assertMetricsContents(doAppendTest("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", false, false, 2));
    }

    @Test
    public void testSilentOverwrite() throws Exception {
        assertMetricsContents(doAppendTest("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", true, false, 2));
    }

    @Test
    public void testFailedWrite() throws IOException {
        MetricsSystem initMetricsSystem = initMetricsSystem("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", false, false);
        new RollingFileSystemSinkTestBase.MyMetrics1().registerWith(initMetricsSystem);
        shutdownHdfs();
        RollingFileSystemSinkTestBase.MockSink.errored = false;
        initMetricsSystem.publishMetricsNow();
        Assert.assertTrue("No exception was generated while writing metrics even though HDFS was unavailable", RollingFileSystemSinkTestBase.MockSink.errored);
        try {
            initMetricsSystem.stop();
        } finally {
            initMetricsSystem.shutdown();
        }
    }

    @Test
    public void testFailedClose() throws IOException {
        MetricsSystem initMetricsSystem = initMetricsSystem("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", false, false);
        new RollingFileSystemSinkTestBase.MyMetrics1().registerWith(initMetricsSystem);
        initMetricsSystem.publishMetricsNow();
        shutdownHdfs();
        RollingFileSystemSinkTestBase.MockSink.errored = false;
        try {
            initMetricsSystem.stop();
            Assert.assertTrue("No exception was generated while stopping sink even though HDFS was unavailable", RollingFileSystemSinkTestBase.MockSink.errored);
            initMetricsSystem.shutdown();
        } catch (MetricsException e) {
            initMetricsSystem.shutdown();
        } catch (Throwable th) {
            initMetricsSystem.shutdown();
            throw th;
        }
    }

    @Test
    public void testSilentFailedWrite() throws IOException, InterruptedException {
        MetricsSystem initMetricsSystem = initMetricsSystem("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", true, false);
        new RollingFileSystemSinkTestBase.MyMetrics1().registerWith(initMetricsSystem);
        shutdownHdfs();
        RollingFileSystemSinkTestBase.MockSink.errored = false;
        initMetricsSystem.publishMetricsNow();
        Assert.assertFalse("An exception was generated writing metrics while HDFS was unavailable, even though the sink is set to ignore errors", RollingFileSystemSinkTestBase.MockSink.errored);
        try {
            initMetricsSystem.stop();
        } finally {
            initMetricsSystem.shutdown();
        }
    }

    @Test
    public void testSilentFailedClose() throws IOException {
        MetricsSystem initMetricsSystem = initMetricsSystem("hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp", true, false);
        new RollingFileSystemSinkTestBase.MyMetrics1().registerWith(initMetricsSystem);
        initMetricsSystem.publishMetricsNow();
        shutdownHdfs();
        RollingFileSystemSinkTestBase.MockSink.errored = false;
        try {
            initMetricsSystem.stop();
            Assert.assertFalse("An exception was generated stopping sink while HDFS was unavailable, even though the sink is set to ignore errors", RollingFileSystemSinkTestBase.MockSink.errored);
        } finally {
            initMetricsSystem.shutdown();
        }
    }

    @Test
    public void testFlushThread() throws Exception {
        RollingFileSystemSink.forceFlush = true;
        String str = "hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp";
        MetricsSystem initMetricsSystem = initMetricsSystem(str, true, false, false);
        new RollingFileSystemSinkTestBase.MyMetrics1().registerWith(initMetricsSystem);
        initMetricsSystem.publishMetricsNow();
        initMetricsSystem.publishMetricsNow();
        int i = 0;
        while (!RollingFileSystemSink.hasFlushed) {
            try {
                Thread.sleep(10L);
                i++;
                if (i > 1000) {
                    Assert.fail("Flush thread did not run within 10 seconds");
                }
            } catch (Throwable th) {
                RollingFileSystemSink.forceFlush = false;
                try {
                    initMetricsSystem.stop();
                    initMetricsSystem.shutdown();
                    throw th;
                } finally {
                }
            }
        }
        Path path = new Path(str, DATE_FORMAT.format(Calendar.getInstance().getTime()) + "00");
        FileSystem newInstance = FileSystem.newInstance(new URI(str), new Configuration());
        FileStatus fileStatus = newInstance.getFileStatus(findMostRecentLogFile(newInstance, new Path(path, getLogFilename())));
        Assert.assertTrue("The flusher thread didn't flush the log contents. Expected at least 236 bytes in the log file, but got " + fileStatus.getLen(), fileStatus.getLen() >= 236);
        RollingFileSystemSink.forceFlush = false;
        try {
            initMetricsSystem.stop();
            initMetricsSystem.shutdown();
        } finally {
        }
    }

    @Test
    public void testInitWithNoHDFS() {
        String str = "hdfs://" + this.cluster.getNameNode().getHostAndPort() + "/tmp";
        shutdownHdfs();
        RollingFileSystemSinkTestBase.MockSink.errored = false;
        initMetricsSystem(str, true, false);
        Assert.assertTrue("The sink was not initialized as expected", RollingFileSystemSinkTestBase.MockSink.initialized);
        Assert.assertFalse("The sink threw an unexpected error on initialization", RollingFileSystemSinkTestBase.MockSink.errored);
    }
}
