package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/TestEntryFileIO.class */
public class TestEntryFileIO extends AbstractManifestCommitterTest {
    private static final Logger LOG = LoggerFactory.getLogger(TestEntryFileIO.class);
    public static final FileEntry ENTRY = new FileEntry("source", "dest", 100, "etag");
    private EntryFileIO entryFileIO;
    private File entryFile;

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    @Before
    public void setup() throws Exception {
        this.entryFileIO = new EntryFileIO(new Configuration());
        createEntryFile();
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest
    @After
    public void teardown() throws Exception {
        Thread.currentThread().setName("teardown");
        if (getEntryFile() != null) {
            getEntryFile().delete();
        }
    }

    private void createEntryFile() throws IOException {
        setEntryFile(File.createTempFile("entry", ".seq"));
    }

    private File getEntryFile() {
        return this.entryFile;
    }

    private void setEntryFile(File file) {
        this.entryFile = file;
    }

    @Test
    public void testCreateWriteReadFileOneEntry() throws Throwable {
        FileEntry fileEntry = ENTRY;
        SequenceFile.Writer createWriter = createWriter();
        createWriter.append(NullWritable.get(), fileEntry);
        createWriter.flush();
        createWriter.close();
        FileEntry fileEntry2 = new FileEntry();
        SequenceFile.Reader readEntryFile = readEntryFile();
        try {
            readEntryFile.next(NullWritable.get(), fileEntry2);
            if (readEntryFile != null) {
                readEntryFile.close();
            }
            Assertions.assertThat(fileEntry2).describedAs("entry read back from sequence file", new Object[0]).isEqualTo(fileEntry);
            EntryFileIO.EntryIterator iterateOverEntryFile = iterateOverEntryFile();
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            RemoteIterators.foreach(iterateOverEntryFile, (v1) -> {
                r1.add(v1);
            });
            Assertions.assertThat(arrayList).describedAs("iteration over the entry file", new Object[0]).hasSize(1).element(0).isEqualTo(fileEntry);
            EntryFileIO.EntryIterator entryIterator = iterateOverEntryFile;
            Assertions.assertThat(entryIterator).describedAs("entry iterator %s", new Object[]{entryIterator}).matches(entryIterator2 -> {
                return entryIterator2.isClosed();
            }).extracting(entryIterator3 -> {
                return Integer.valueOf(entryIterator3.getCount());
            }).isEqualTo(1);
        } catch (Throwable th) {
            if (readEntryFile != null) {
                try {
                    readEntryFile.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private SequenceFile.Writer createWriter() throws IOException {
        return this.entryFileIO.createWriter(getEntryFile());
    }

    private RemoteIterator<FileEntry> iterateOverEntryFile() throws IOException {
        return this.entryFileIO.iterateOver(readEntryFile());
    }

    private SequenceFile.Reader readEntryFile() throws IOException {
        assertEntryFileNonEmpty();
        return this.entryFileIO.createReader(getEntryFile());
    }

    @Test
    public void testCreateEmptyFile() throws Throwable {
        this.entryFileIO.createWriter(getEntryFile()).close();
        ArrayList arrayList = new ArrayList();
        RemoteIterator<FileEntry> iterateOverEntryFile = iterateOverEntryFile();
        Objects.requireNonNull(arrayList);
        Assertions.assertThat(RemoteIterators.foreach(iterateOverEntryFile, (v1) -> {
            r1.add(v1);
        })).describedAs("Count of iterations over entries in an entry file with no entries", new Object[0]).isEqualTo(0L);
    }

    private void assertEntryFileNonEmpty() {
        Assertions.assertThat(getEntryFile().length()).describedAs("Length of file %s", new Object[]{getEntryFile()}).isGreaterThan(0L);
    }

    @Test
    public void testCreateInvalidWriter() throws Throwable {
        LambdaTestUtils.intercept(NullPointerException.class, () -> {
            return this.entryFileIO.launchEntryWriter((SequenceFile.Writer) null, 1);
        });
    }

    @Test
    public void testCreateInvalidWriterCapacity() throws Throwable {
        LambdaTestUtils.intercept(IllegalStateException.class, () -> {
            return this.entryFileIO.launchEntryWriter((SequenceFile.Writer) null, 0);
        });
    }

    @Test
    public void testLargeStreamingWrite() throws Throwable {
        int i = 100;
        List<FileEntry> buildEntryList = buildEntryList(100);
        int i2 = 100 * 100;
        EntryFileIO.EntryWriter launchEntryWriter = this.entryFileIO.launchEntryWriter(createWriter(), 2);
        try {
            Assertions.assertThat(launchEntryWriter.isActive()).describedAs("out.isActive in ()", new Object[]{launchEntryWriter}).isTrue();
            for (int i3 = 0; i3 < 100; i3++) {
                Assertions.assertThat(launchEntryWriter.enqueue(buildEntryList)).describedAs("enqueue of list", new Object[0]).isTrue();
            }
            launchEntryWriter.close();
            launchEntryWriter.maybeRaiseWriteException();
            Assertions.assertThat(launchEntryWriter.isActive()).describedAs("out.isActive in ()", new Object[]{launchEntryWriter}).isFalse();
            Assertions.assertThat(launchEntryWriter.getCount()).describedAs("total elements written", new Object[0]).isEqualTo(i2);
            if (launchEntryWriter != null) {
                launchEntryWriter.close();
            }
            AtomicInteger atomicInteger = new AtomicInteger();
            RemoteIterators.foreach(iterateOverEntryFile(), fileEntry -> {
                int andIncrement = atomicInteger.getAndIncrement();
                int i4 = andIncrement % i;
                Assertions.assertThat(fileEntry).describedAs("element %d in file mapping to index %d", new Object[]{Integer.valueOf(andIncrement), Integer.valueOf(i4)}).isEqualTo(buildEntryList.get(i4));
            });
            Assertions.assertThat(atomicInteger.get()).describedAs("total elements read", new Object[0]).isEqualTo(i2);
        } catch (Throwable th) {
            if (launchEntryWriter != null) {
                try {
                    launchEntryWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static List<FileEntry> buildEntryList(int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new FileEntry("source" + i2, "dest" + i2, i2, "etag-" + i2));
        }
        Assertions.assertThat(arrayList).hasSize(i);
        return arrayList;
    }

    @Test
    public void testFailurePropagation() throws Throwable {
        SequenceFile.Writer spyWithFailingAppend = spyWithFailingAppend(this.entryFileIO.createWriter(getEntryFile()), 4);
        List<FileEntry> buildEntryList = buildEntryList(1);
        EntryFileIO.EntryWriter launchEntryWriter = this.entryFileIO.launchEntryWriter(spyWithFailingAppend, 2);
        boolean z = true;
        for (int i = 0; z && i < 8; i++) {
            try {
                z = launchEntryWriter.enqueue(buildEntryList);
            } catch (Throwable th) {
                if (launchEntryWriter != null) {
                    try {
                        launchEntryWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        LOG.info("queue to {} finished valid={}", launchEntryWriter, Boolean.valueOf(z));
        launchEntryWriter.close();
        LambdaTestUtils.intercept(IOException.class, "mocked", () -> {
            launchEntryWriter.maybeRaiseWriteException();
        });
        Assertions.assertThat(launchEntryWriter.getCount()).describedAs("process count of %s", new Object[]{4}).isEqualTo(4);
        if (launchEntryWriter != null) {
            launchEntryWriter.close();
        }
    }

    private static SequenceFile.Writer spyWithFailingAppend(SequenceFile.Writer writer, int i) throws IOException {
        AtomicLong atomicLong = new AtomicLong(i);
        SequenceFile.Writer writer2 = (SequenceFile.Writer) Mockito.spy(writer);
        ((SequenceFile.Writer) Mockito.doAnswer(invocationOnMock -> {
            Writable writable = (Writable) invocationOnMock.getArgument(0);
            Writable writable2 = (Writable) invocationOnMock.getArgument(1);
            if (atomicLong.getAndDecrement() <= 0) {
                throw new IOException("mocked");
            }
            writer.append(writable, writable2);
            return null;
        }).when(writer2)).append((Writable) Mockito.any(Writable.class), (Writable) Mockito.any(Writable.class));
        return writer2;
    }

    @Test
    public void testParallelWrite() throws Throwable {
        List<FileEntry> buildEntryList = buildEntryList(100);
        int i = 100 * 100;
        EntryFileIO.EntryWriter launchEntryWriter = this.entryFileIO.launchEntryWriter(createWriter(), 20);
        try {
            TaskPool.foreach(RemoteIterators.rangeExcludingIterator(0L, 100)).executeWith(getSubmitter()).stopOnFailure().run(l -> {
                launchEntryWriter.enqueue(buildEntryList);
            });
            launchEntryWriter.close();
            launchEntryWriter.maybeRaiseWriteException();
            Assertions.assertThat(launchEntryWriter.getCount()).describedAs("total elements written", new Object[0]).isEqualTo(i);
            if (launchEntryWriter != null) {
                launchEntryWriter.close();
            }
            Assertions.assertThat(RemoteIterators.foreach(iterateOverEntryFile(), fileEntry -> {
            })).describedAs("total elements read", new Object[0]).isEqualTo(i);
        } catch (Throwable th) {
            if (launchEntryWriter != null) {
                try {
                    launchEntryWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
